SpringBoot 3.x 整合 Kafka 詳解
🎯 Kafka簡介
Apache Kafka是一個分布式流處理平臺,主要用于構建實時數據管道和流應用程序。它具有高吞吐量、低延遲、可擴展性和容錯性等特點。
核心概念
- Producer: 生產者,發送消息到Kafka集群
- Consumer: 消費者,從Kafka集群讀取消息
- Topic: 主題,消息的分類,類似于消息隊列
- Partition: 分區,Topic的物理分割,提高并行處理能力
- Broker: 代理,Kafka集群中的服務器節點
- Consumer Group: 消費者組,多個消費者組成的組,共同消費Topic
- Offset: 偏移量,消息在分區中的位置標識
核心特性
- 高吞吐量: 支持每秒數百萬條消息
- 低延遲: 毫秒級的消息傳遞延遲
- 持久化: 消息持久化存儲到磁盤
- 分布式: 支持集群部署和水平擴展
- 容錯性: 支持數據復制和故障恢復
🚀 快速開始
1. 添加依賴
<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 數據驗證 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- 測試依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka測試依賴 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>
2. Kafka配置
spring:# Kafka配置kafka:# Kafka服務器地址bootstrap-servers: localhost:9092# 生產者配置producer:# 重試次數retries: 3# 批量發送大小batch-size: 16384# 緩沖區大小buffer-memory: 33554432# 鍵序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 確認模式acks: all# 壓縮類型compression-type: gzip# 發送超時時間properties:delivery.timeout.ms: 120000request.timeout.ms: 30000# 消費者配置consumer:# 消費者組IDgroup-id: demo-group# 自動提交偏移量enable-auto-commit: false# 自動提交間隔auto-commit-interval: 1000# 鍵反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 從最早的消息開始消費auto-offset-reset: earliest# 每次拉取的最大記錄數max-poll-records: 500# 拉取超時時間fetch-max-wait: 500# JSON反序列化配置properties:spring.json.trusted.packages: "com.example.demo.dto"spring.json.type.mapping: "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto"# 監聽器配置listener:# 確認模式ack-mode: manual_immediate# 并發數concurrency: 3# 輪詢超時時間poll-timeout: 3000# 錯誤處理器type: batch# 日志配置
logging:level:org.springframework.kafka: DEBUGorg.apache.kafka: DEBUG
🔧 Kafka配置類
package com.example.demo.config;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;/*** 生產者配置*/@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");return new DefaultKafkaProducerFactory<>(configProps);}/*** KafkaTemplate配置*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消費者配置*/@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);// JSON反序列化配置props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.TYPE_MAPPINGS, "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto");return new DefaultKafkaConsumerFactory<>(props);}/*** 監聽器容器工廠配置*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 設置并發數factory.setConcurrency(3);// 設置確認模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 設置錯誤處理器factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler());return factory;}/*** 用戶事件消費者工廠*/@Beanpublic ConsumerFactory<String, UserEventDto> userEventConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEventDto.class);return new DefaultKafkaConsumerFactory<>(props);}/*** 用戶事件監聽器容器工廠*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, UserEventDto> userEventKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, UserEventDto> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(userEventConsumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}
📊 消息DTO類
1. 用戶事件DTO
package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.time.LocalDateTime;public class UserEventDto {@NotBlank(message = "事件ID不能為空")private String eventId;@NotBlank(message = "事件類型不能為空")private String eventType; // CREATE, UPDATE, DELETE@NotNull(message = "用戶ID不能為空")private Long userId;private String username;private String email;private String operation;private String operatorId;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;private Object data; // 額外數據// 構造函數public UserEventDto() {this.timestamp = LocalDateTime.now();}public UserEventDto(String eventId, String eventType, Long userId, String operation) {this();this.eventId = eventId;this.eventType = eventType;this.userId = userId;this.operation = operation;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getOperation() { return operation; }public void setOperation(String operation) { this.operation = operation; }public String getOperatorId() { return operatorId; }public void setOperatorId(String operatorId) { this.operatorId = operatorId; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getData() { return data; }public void setData(Object data) { this.data = data; }@Overridepublic String toString() {return "UserEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", userId=" + userId +", username='" + username + '\'' +", operation='" + operation + '\'' +", timestamp=" + timestamp +'}';}
}
2. 訂單事件DTO
package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;public class OrderEventDto {@NotBlank(message = "事件ID不能為空")private String eventId;@NotBlank(message = "事件類型不能為空")private String eventType; // CREATED, PAID, SHIPPED, DELIVERED, CANCELLED@NotNull(message = "訂單ID不能為空")private Long orderId;@NotNull(message = "用戶ID不能為空")private Long userId;private String orderNo;private BigDecimal totalAmount;private String status;private List<OrderItem> items;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 訂單項public static class OrderItem {private Long productId;private String productName;private Integer quantity;private BigDecimal price;// 構造函數public OrderItem() {}public OrderItem(Long productId, String productName, Integer quantity, BigDecimal price) {this.productId = productId;this.productName = productName;this.quantity = quantity;this.price = price;}// Getter和Setter方法public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public String getProductName() { return productName; }public void setProductName(String productName) { this.productName = productName; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }public BigDecimal getPrice() { return price; }public void setPrice(BigDecimal price) { this.price = price; }}// 構造函數public OrderEventDto() {this.timestamp = LocalDateTime.now();}public OrderEventDto(String eventId, String eventType, Long orderId, Long userId) {this();this.eventId = eventId;this.eventType = eventType;this.orderId = orderId;this.userId = userId;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getOrderNo() { return orderNo; }public void setOrderNo(String orderNo) { this.orderNo = orderNo; }public BigDecimal getTotalAmount() { return totalAmount; }public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }public String getStatus() { return status; }public void setStatus(String status) { this.status = status; }public List<OrderItem> getItems() { return items; }public void setItems(List<OrderItem> items) { this.items = items; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "OrderEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", orderId=" + orderId +", userId=" + userId +", orderNo='" + orderNo + '\'' +", totalAmount=" + totalAmount +", status='" + status + '\'' +", timestamp=" + timestamp +'}';}
}
📤 消息生產者
package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// Topic名稱常量public static final String USER_EVENTS_TOPIC = "user-events";public static final String ORDER_EVENTS_TOPIC = "order-events";public static final String NOTIFICATION_TOPIC = "notifications";/*** 發送用戶事件*/public void sendUserEvent(UserEventDto userEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(USER_EVENTS_TOPIC, userEvent.getUserId().toString(), userEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("用戶事件發送成功: " + userEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("用戶事件發送失敗: " + userEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送用戶事件異常: " + e.getMessage());}}/*** 發送訂單事件*/public void sendOrderEvent(OrderEventDto orderEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(ORDER_EVENTS_TOPIC, orderEvent.getOrderId().toString(), orderEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("訂單事件發送成功: " + orderEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("訂單事件發送失敗: " + orderEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送訂單事件異常: " + e.getMessage());}}/*** 發送通知消息*/public void sendNotification(String message) {try {String messageId = UUID.randomUUID().toString();CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(NOTIFICATION_TOPIC, messageId, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("通知消息發送成功: " + messageId + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("通知消息發送失敗: " + messageId + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送通知消息異常: " + e.getMessage());}}/*** 發送帶分區的消息*/public void sendMessageToPartition(String topic, int partition, String key, Object message) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, partition, key, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息發送到分區成功: partition=" + partition + " offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("消息發送到分區失敗: " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送分區消息異常: " + e.getMessage());}}/*** 批量發送消息*/public void sendBatchMessages(String topic, java.util.List<Object> messages) {messages.forEach(message -> {String key = UUID.randomUUID().toString();kafkaTemplate.send(topic, key, message);});// 刷新緩沖區,確保消息立即發送kafkaTemplate.flush();System.out.println("批量發送 " + messages.size() + " 條消息完成");}
}
📥 消息消費者
package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class KafkaConsumerService {/*** 消費用戶事件*/@KafkaListener(topics = "user-events", groupId = "user-event-group", containerFactory = "userEventKafkaListenerContainerFactory")public void consumeUserEvent(@Payload UserEventDto userEvent,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset,Acknowledgment acknowledgment) {try {System.out.println("接收到用戶事件: " + userEvent);System.out.println("Topic: " + topic + ", Partition: " + partition + ", Offset: " + offset);// 處理用戶事件的業務邏輯processUserEvent(userEvent);// 手動確認消息acknowledgment.acknowledge();System.out.println("用戶事件處理完成: " + userEvent.getEventId());} catch (Exception e) {System.err.println("處理用戶事件失敗: " + e.getMessage());// 這里可以實現重試邏輯或將消息發送到死信隊列}}/*** 消費訂單事件*/@KafkaListener(topics = "order-events", groupId = "order-event-group")public void consumeOrderEvent(ConsumerRecord<String, OrderEventDto> record,Acknowledgment acknowledgment) {try {OrderEventDto orderEvent = record.value();System.out.println("接收到訂單事件: " + orderEvent);System.out.println("Key: " + record.key() + ", Partition: " + record.partition() + ", Offset: " + record.offset());// 處理訂單事件的業務邏輯processOrderEvent(orderEvent);// 手動確認消息acknowledgment.acknowledge();System.out.println("訂單事件處理完成: " + orderEvent.getEventId());} catch (Exception e) {System.err.println("處理訂單事件失敗: " + e.getMessage());}}/*** 消費通知消息*/@KafkaListener(topics = "notifications", groupId = "notification-group")public void consumeNotification(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,Acknowledgment acknowledgment) {try {System.out.println("接收到通知消息: " + message);System.out.println("Message Key: " + key);// 處理通知消息的業務邏輯processNotification(message);// 手動確認消息acknowledgment.acknowledge();System.out.println("通知消息處理完成");} catch (Exception e) {System.err.println("處理通知消息失敗: " + e.getMessage());}}/*** 批量消費消息*/@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatchMessages(List<ConsumerRecord<String, Object>> records,Acknowledgment acknowledgment) {try {System.out.println("接收到批量消息,數量: " + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println("處理消息: Key=" + record.key() + ", Value=" + record.value() + ", Partition=" + record.partition() + ", Offset=" + record.offset());// 處理單條消息processBatchMessage(record.value());}// 批量確認所有消息acknowledgment.acknowledge();System.out.println("批量消息處理完成");} catch (Exception e) {System.err.println("處理批量消息失敗: " + e.getMessage());}}/*** 多Topic消費*/@KafkaListener(topics = {"user-events", "order-events"}, groupId = "multi-topic-group")public void consumeMultiTopicEvents(ConsumerRecord<String, Object> record,Acknowledgment acknowledgment) {try {String topic = record.topic();Object value = record.value();System.out.println("接收到多Topic消息: Topic=" + topic + ", Value=" + value);// 根據Topic類型處理不同的消息switch (topic) {case "user-events":if (value instanceof UserEventDto) {processUserEvent((UserEventDto) value);}break;case "order-events":if (value instanceof OrderEventDto) {processOrderEvent((OrderEventDto) value);}break;default:System.out.println("未知Topic: " + topic);}acknowledgment.acknowledge();} catch (Exception e) {System.err.println("處理多Topic消息失敗: " + e.getMessage());}}// 業務處理方法private void processUserEvent(UserEventDto userEvent) {// 根據事件類型處理用戶事件switch (userEvent.getEventType()) {case "CREATE":System.out.println("處理用戶創建事件: " + userEvent.getUserId());// 發送歡迎郵件、初始化用戶數據等break;case "UPDATE":System.out.println("處理用戶更新事件: " + userEvent.getUserId());// 同步用戶信息到其他系統break;case "DELETE":System.out.println("處理用戶刪除事件: " + userEvent.getUserId());// 清理用戶相關數據break;default:System.out.println("未知用戶事件類型: " + userEvent.getEventType());}}private void processOrderEvent(OrderEventDto orderEvent) {// 根據事件類型處理訂單事件switch (orderEvent.getEventType()) {case "CREATED":System.out.println("處理訂單創建事件: " + orderEvent.getOrderId());// 庫存扣減、發送確認郵件等break;case "PAID":System.out.println("處理訂單支付事件: " + orderEvent.getOrderId());// 更新訂單狀態、準備發貨等break;case "SHIPPED":System.out.println("處理訂單發貨事件: " + orderEvent.getOrderId());// 發送物流信息、更新狀態等break;case "DELIVERED":System.out.println("處理訂單送達事件: " + orderEvent.getOrderId());// 確認收貨、評價提醒等break;case "CANCELLED":System.out.println("處理訂單取消事件: " + orderEvent.getOrderId());// 退款處理、庫存回滾等break;default:System.out.println("未知訂單事件類型: " + orderEvent.getEventType());}}private void processNotification(String message) {System.out.println("處理通知消息: " + message);// 發送郵件、短信、推送通知等}private void processBatchMessage(Object message) {System.out.println("處理批量消息項: " + message);// 批量處理邏輯}
}
🎮 Controller層
package com.example.demo.controller;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import com.example.demo.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import jakarta.validation.Valid;
import java.math.BigDecimal;
import java.util.*;@RestController
@RequestMapping("/api/kafka")
@CrossOrigin(origins = "*")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;/*** 發送用戶事件*/@PostMapping("/user-events")public ResponseEntity<Map<String, String>> sendUserEvent(@RequestBody @Valid UserEventDto userEvent) {kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用戶事件發送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 發送訂單事件*/@PostMapping("/order-events")public ResponseEntity<Map<String, String>> sendOrderEvent(@RequestBody @Valid OrderEventDto orderEvent) {kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "訂單事件發送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 發送通知消息*/@PostMapping("/notifications")public ResponseEntity<Map<String, String>> sendNotification(@RequestBody Map<String, String> request) {String message = request.get("message");kafkaProducerService.sendNotification(message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "通知消息發送成功");return ResponseEntity.ok(response);}/*** 快速創建用戶事件*/@PostMapping("/quick/user-event")public ResponseEntity<Map<String, String>> quickUserEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long userId = Long.valueOf(request.get("userId").toString());String username = (String) request.get("username");String email = (String) request.get("email");UserEventDto userEvent = new UserEventDto(UUID.randomUUID().toString(),eventType,userId,"API_OPERATION");userEvent.setUsername(username);userEvent.setEmail(email);userEvent.setOperatorId("system");kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用戶事件創建并發送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 快速創建訂單事件*/@PostMapping("/quick/order-event")public ResponseEntity<Map<String, String>> quickOrderEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long orderId = Long.valueOf(request.get("orderId").toString());Long userId = Long.valueOf(request.get("userId").toString());String orderNo = (String) request.get("orderNo");BigDecimal totalAmount = new BigDecimal(request.get("totalAmount").toString());OrderEventDto orderEvent = new OrderEventDto(UUID.randomUUID().toString(),eventType,orderId,userId);orderEvent.setOrderNo(orderNo);orderEvent.setTotalAmount(totalAmount);orderEvent.setStatus(eventType.toLowerCase());kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "訂單事件創建并發送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 批量發送消息*/@PostMapping("/batch")public ResponseEntity<Map<String, String>> sendBatchMessages(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");@SuppressWarnings("unchecked")List<String> messages = (List<String>) request.get("messages");List<Object> messageObjects = new ArrayList<>(messages);kafkaProducerService.sendBatchMessages(topic, messageObjects);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "批量消息發送成功");response.put("count", String.valueOf(messages.size()));return ResponseEntity.ok(response);}/*** 發送到指定分區*/@PostMapping("/partition")public ResponseEntity<Map<String, String>> sendToPartition(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");Integer partition = (Integer) request.get("partition");String key = (String) request.get("key");Object message = request.get("message");kafkaProducerService.sendMessageToPartition(topic, partition, key, message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "消息發送到指定分區成功");response.put("partition", partition.toString());return ResponseEntity.ok(response);}
}
📊 最佳實踐
1. 消息設計
- 設計合理的消息格式
- 使用版本化的消息結構
- 包含必要的元數據信息
- 考慮消息的向后兼容性
2. 性能優化
- 合理設置批量大小
- 使用壓縮減少網絡傳輸
- 優化序列化方式
- 合理設置分區數量
3. 可靠性保證
- 啟用生產者確認機制
- 實現消費者冪等性
- 處理重復消息
- 實現死信隊列機制
4. 監控與運維
- 監控消息積壓情況
- 跟蹤消費者延遲
- 監控集群健康狀態
- 實現告警機制
本文關鍵詞: Kafka, 消息隊列, 流處理, 分布式系統, 事件驅動, 微服務通信