SpringBoot3.x入門到精通系列:4.2 整合 Kafka 詳解

SpringBoot 3.x 整合 Kafka 詳解

🎯 Kafka簡介

Apache Kafka是一個分布式流處理平臺,主要用于構建實時數據管道和流應用程序。它具有高吞吐量、低延遲、可擴展性和容錯性等特點。

核心概念

  • Producer: 生產者,發送消息到Kafka集群
  • Consumer: 消費者,從Kafka集群讀取消息
  • Topic: 主題,消息的分類,類似于消息隊列
  • Partition: 分區,Topic的物理分割,提高并行處理能力
  • Broker: 代理,Kafka集群中的服務器節點
  • Consumer Group: 消費者組,多個消費者組成的組,共同消費Topic
  • Offset: 偏移量,消息在分區中的位置標識

核心特性

  • 高吞吐量: 支持每秒數百萬條消息
  • 低延遲: 毫秒級的消息傳遞延遲
  • 持久化: 消息持久化存儲到磁盤
  • 分布式: 支持集群部署和水平擴展
  • 容錯性: 支持數據復制和故障恢復

🚀 快速開始

1. 添加依賴

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 數據驗證 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- 測試依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka測試依賴 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. Kafka配置

spring:# Kafka配置kafka:# Kafka服務器地址bootstrap-servers: localhost:9092# 生產者配置producer:# 重試次數retries: 3# 批量發送大小batch-size: 16384# 緩沖區大小buffer-memory: 33554432# 鍵序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 確認模式acks: all# 壓縮類型compression-type: gzip# 發送超時時間properties:delivery.timeout.ms: 120000request.timeout.ms: 30000# 消費者配置consumer:# 消費者組IDgroup-id: demo-group# 自動提交偏移量enable-auto-commit: false# 自動提交間隔auto-commit-interval: 1000# 鍵反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 從最早的消息開始消費auto-offset-reset: earliest# 每次拉取的最大記錄數max-poll-records: 500# 拉取超時時間fetch-max-wait: 500# JSON反序列化配置properties:spring.json.trusted.packages: "com.example.demo.dto"spring.json.type.mapping: "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto"# 監聽器配置listener:# 確認模式ack-mode: manual_immediate# 并發數concurrency: 3# 輪詢超時時間poll-timeout: 3000# 錯誤處理器type: batch# 日志配置
logging:level:org.springframework.kafka: DEBUGorg.apache.kafka: DEBUG

🔧 Kafka配置類

package com.example.demo.config;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;/*** 生產者配置*/@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");return new DefaultKafkaProducerFactory<>(configProps);}/*** KafkaTemplate配置*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消費者配置*/@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);// JSON反序列化配置props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.TYPE_MAPPINGS, "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto");return new DefaultKafkaConsumerFactory<>(props);}/*** 監聽器容器工廠配置*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 設置并發數factory.setConcurrency(3);// 設置確認模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 設置錯誤處理器factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler());return factory;}/*** 用戶事件消費者工廠*/@Beanpublic ConsumerFactory<String, UserEventDto> userEventConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEventDto.class);return new DefaultKafkaConsumerFactory<>(props);}/*** 用戶事件監聽器容器工廠*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, UserEventDto> userEventKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, UserEventDto> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(userEventConsumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

📊 消息DTO類

1. 用戶事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.time.LocalDateTime;public class UserEventDto {@NotBlank(message = "事件ID不能為空")private String eventId;@NotBlank(message = "事件類型不能為空")private String eventType; // CREATE, UPDATE, DELETE@NotNull(message = "用戶ID不能為空")private Long userId;private String username;private String email;private String operation;private String operatorId;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;private Object data; // 額外數據// 構造函數public UserEventDto() {this.timestamp = LocalDateTime.now();}public UserEventDto(String eventId, String eventType, Long userId, String operation) {this();this.eventId = eventId;this.eventType = eventType;this.userId = userId;this.operation = operation;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getOperation() { return operation; }public void setOperation(String operation) { this.operation = operation; }public String getOperatorId() { return operatorId; }public void setOperatorId(String operatorId) { this.operatorId = operatorId; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getData() { return data; }public void setData(Object data) { this.data = data; }@Overridepublic String toString() {return "UserEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", userId=" + userId +", username='" + username + '\'' +", operation='" + operation + '\'' +", timestamp=" + timestamp +'}';}
}

2. 訂單事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;public class OrderEventDto {@NotBlank(message = "事件ID不能為空")private String eventId;@NotBlank(message = "事件類型不能為空")private String eventType; // CREATED, PAID, SHIPPED, DELIVERED, CANCELLED@NotNull(message = "訂單ID不能為空")private Long orderId;@NotNull(message = "用戶ID不能為空")private Long userId;private String orderNo;private BigDecimal totalAmount;private String status;private List<OrderItem> items;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 訂單項public static class OrderItem {private Long productId;private String productName;private Integer quantity;private BigDecimal price;// 構造函數public OrderItem() {}public OrderItem(Long productId, String productName, Integer quantity, BigDecimal price) {this.productId = productId;this.productName = productName;this.quantity = quantity;this.price = price;}// Getter和Setter方法public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public String getProductName() { return productName; }public void setProductName(String productName) { this.productName = productName; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }public BigDecimal getPrice() { return price; }public void setPrice(BigDecimal price) { this.price = price; }}// 構造函數public OrderEventDto() {this.timestamp = LocalDateTime.now();}public OrderEventDto(String eventId, String eventType, Long orderId, Long userId) {this();this.eventId = eventId;this.eventType = eventType;this.orderId = orderId;this.userId = userId;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getOrderNo() { return orderNo; }public void setOrderNo(String orderNo) { this.orderNo = orderNo; }public BigDecimal getTotalAmount() { return totalAmount; }public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }public String getStatus() { return status; }public void setStatus(String status) { this.status = status; }public List<OrderItem> getItems() { return items; }public void setItems(List<OrderItem> items) { this.items = items; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "OrderEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", orderId=" + orderId +", userId=" + userId +", orderNo='" + orderNo + '\'' +", totalAmount=" + totalAmount +", status='" + status + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生產者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// Topic名稱常量public static final String USER_EVENTS_TOPIC = "user-events";public static final String ORDER_EVENTS_TOPIC = "order-events";public static final String NOTIFICATION_TOPIC = "notifications";/*** 發送用戶事件*/public void sendUserEvent(UserEventDto userEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(USER_EVENTS_TOPIC, userEvent.getUserId().toString(), userEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("用戶事件發送成功: " + userEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("用戶事件發送失敗: " + userEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送用戶事件異常: " + e.getMessage());}}/*** 發送訂單事件*/public void sendOrderEvent(OrderEventDto orderEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(ORDER_EVENTS_TOPIC, orderEvent.getOrderId().toString(), orderEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("訂單事件發送成功: " + orderEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("訂單事件發送失敗: " + orderEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送訂單事件異常: " + e.getMessage());}}/*** 發送通知消息*/public void sendNotification(String message) {try {String messageId = UUID.randomUUID().toString();CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(NOTIFICATION_TOPIC, messageId, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("通知消息發送成功: " + messageId + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("通知消息發送失敗: " + messageId + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送通知消息異常: " + e.getMessage());}}/*** 發送帶分區的消息*/public void sendMessageToPartition(String topic, int partition, String key, Object message) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, partition, key, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息發送到分區成功: partition=" + partition + " offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("消息發送到分區失敗: " + ex.getMessage());}});} catch (Exception e) {System.err.println("發送分區消息異常: " + e.getMessage());}}/*** 批量發送消息*/public void sendBatchMessages(String topic, java.util.List<Object> messages) {messages.forEach(message -> {String key = UUID.randomUUID().toString();kafkaTemplate.send(topic, key, message);});// 刷新緩沖區,確保消息立即發送kafkaTemplate.flush();System.out.println("批量發送 " + messages.size() + " 條消息完成");}
}

📥 消息消費者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class KafkaConsumerService {/*** 消費用戶事件*/@KafkaListener(topics = "user-events", groupId = "user-event-group", containerFactory = "userEventKafkaListenerContainerFactory")public void consumeUserEvent(@Payload UserEventDto userEvent,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset,Acknowledgment acknowledgment) {try {System.out.println("接收到用戶事件: " + userEvent);System.out.println("Topic: " + topic + ", Partition: " + partition + ", Offset: " + offset);// 處理用戶事件的業務邏輯processUserEvent(userEvent);// 手動確認消息acknowledgment.acknowledge();System.out.println("用戶事件處理完成: " + userEvent.getEventId());} catch (Exception e) {System.err.println("處理用戶事件失敗: " + e.getMessage());// 這里可以實現重試邏輯或將消息發送到死信隊列}}/*** 消費訂單事件*/@KafkaListener(topics = "order-events", groupId = "order-event-group")public void consumeOrderEvent(ConsumerRecord<String, OrderEventDto> record,Acknowledgment acknowledgment) {try {OrderEventDto orderEvent = record.value();System.out.println("接收到訂單事件: " + orderEvent);System.out.println("Key: " + record.key() + ", Partition: " + record.partition() + ", Offset: " + record.offset());// 處理訂單事件的業務邏輯processOrderEvent(orderEvent);// 手動確認消息acknowledgment.acknowledge();System.out.println("訂單事件處理完成: " + orderEvent.getEventId());} catch (Exception e) {System.err.println("處理訂單事件失敗: " + e.getMessage());}}/*** 消費通知消息*/@KafkaListener(topics = "notifications", groupId = "notification-group")public void consumeNotification(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,Acknowledgment acknowledgment) {try {System.out.println("接收到通知消息: " + message);System.out.println("Message Key: " + key);// 處理通知消息的業務邏輯processNotification(message);// 手動確認消息acknowledgment.acknowledge();System.out.println("通知消息處理完成");} catch (Exception e) {System.err.println("處理通知消息失敗: " + e.getMessage());}}/*** 批量消費消息*/@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatchMessages(List<ConsumerRecord<String, Object>> records,Acknowledgment acknowledgment) {try {System.out.println("接收到批量消息,數量: " + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println("處理消息: Key=" + record.key() + ", Value=" + record.value() + ", Partition=" + record.partition() + ", Offset=" + record.offset());// 處理單條消息processBatchMessage(record.value());}// 批量確認所有消息acknowledgment.acknowledge();System.out.println("批量消息處理完成");} catch (Exception e) {System.err.println("處理批量消息失敗: " + e.getMessage());}}/*** 多Topic消費*/@KafkaListener(topics = {"user-events", "order-events"}, groupId = "multi-topic-group")public void consumeMultiTopicEvents(ConsumerRecord<String, Object> record,Acknowledgment acknowledgment) {try {String topic = record.topic();Object value = record.value();System.out.println("接收到多Topic消息: Topic=" + topic + ", Value=" + value);// 根據Topic類型處理不同的消息switch (topic) {case "user-events":if (value instanceof UserEventDto) {processUserEvent((UserEventDto) value);}break;case "order-events":if (value instanceof OrderEventDto) {processOrderEvent((OrderEventDto) value);}break;default:System.out.println("未知Topic: " + topic);}acknowledgment.acknowledge();} catch (Exception e) {System.err.println("處理多Topic消息失敗: " + e.getMessage());}}// 業務處理方法private void processUserEvent(UserEventDto userEvent) {// 根據事件類型處理用戶事件switch (userEvent.getEventType()) {case "CREATE":System.out.println("處理用戶創建事件: " + userEvent.getUserId());// 發送歡迎郵件、初始化用戶數據等break;case "UPDATE":System.out.println("處理用戶更新事件: " + userEvent.getUserId());// 同步用戶信息到其他系統break;case "DELETE":System.out.println("處理用戶刪除事件: " + userEvent.getUserId());// 清理用戶相關數據break;default:System.out.println("未知用戶事件類型: " + userEvent.getEventType());}}private void processOrderEvent(OrderEventDto orderEvent) {// 根據事件類型處理訂單事件switch (orderEvent.getEventType()) {case "CREATED":System.out.println("處理訂單創建事件: " + orderEvent.getOrderId());// 庫存扣減、發送確認郵件等break;case "PAID":System.out.println("處理訂單支付事件: " + orderEvent.getOrderId());// 更新訂單狀態、準備發貨等break;case "SHIPPED":System.out.println("處理訂單發貨事件: " + orderEvent.getOrderId());// 發送物流信息、更新狀態等break;case "DELIVERED":System.out.println("處理訂單送達事件: " + orderEvent.getOrderId());// 確認收貨、評價提醒等break;case "CANCELLED":System.out.println("處理訂單取消事件: " + orderEvent.getOrderId());// 退款處理、庫存回滾等break;default:System.out.println("未知訂單事件類型: " + orderEvent.getEventType());}}private void processNotification(String message) {System.out.println("處理通知消息: " + message);// 發送郵件、短信、推送通知等}private void processBatchMessage(Object message) {System.out.println("處理批量消息項: " + message);// 批量處理邏輯}
}

🎮 Controller層

package com.example.demo.controller;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import com.example.demo.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import jakarta.validation.Valid;
import java.math.BigDecimal;
import java.util.*;@RestController
@RequestMapping("/api/kafka")
@CrossOrigin(origins = "*")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;/*** 發送用戶事件*/@PostMapping("/user-events")public ResponseEntity<Map<String, String>> sendUserEvent(@RequestBody @Valid UserEventDto userEvent) {kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用戶事件發送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 發送訂單事件*/@PostMapping("/order-events")public ResponseEntity<Map<String, String>> sendOrderEvent(@RequestBody @Valid OrderEventDto orderEvent) {kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "訂單事件發送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 發送通知消息*/@PostMapping("/notifications")public ResponseEntity<Map<String, String>> sendNotification(@RequestBody Map<String, String> request) {String message = request.get("message");kafkaProducerService.sendNotification(message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "通知消息發送成功");return ResponseEntity.ok(response);}/*** 快速創建用戶事件*/@PostMapping("/quick/user-event")public ResponseEntity<Map<String, String>> quickUserEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long userId = Long.valueOf(request.get("userId").toString());String username = (String) request.get("username");String email = (String) request.get("email");UserEventDto userEvent = new UserEventDto(UUID.randomUUID().toString(),eventType,userId,"API_OPERATION");userEvent.setUsername(username);userEvent.setEmail(email);userEvent.setOperatorId("system");kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用戶事件創建并發送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 快速創建訂單事件*/@PostMapping("/quick/order-event")public ResponseEntity<Map<String, String>> quickOrderEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long orderId = Long.valueOf(request.get("orderId").toString());Long userId = Long.valueOf(request.get("userId").toString());String orderNo = (String) request.get("orderNo");BigDecimal totalAmount = new BigDecimal(request.get("totalAmount").toString());OrderEventDto orderEvent = new OrderEventDto(UUID.randomUUID().toString(),eventType,orderId,userId);orderEvent.setOrderNo(orderNo);orderEvent.setTotalAmount(totalAmount);orderEvent.setStatus(eventType.toLowerCase());kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "訂單事件創建并發送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 批量發送消息*/@PostMapping("/batch")public ResponseEntity<Map<String, String>> sendBatchMessages(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");@SuppressWarnings("unchecked")List<String> messages = (List<String>) request.get("messages");List<Object> messageObjects = new ArrayList<>(messages);kafkaProducerService.sendBatchMessages(topic, messageObjects);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "批量消息發送成功");response.put("count", String.valueOf(messages.size()));return ResponseEntity.ok(response);}/*** 發送到指定分區*/@PostMapping("/partition")public ResponseEntity<Map<String, String>> sendToPartition(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");Integer partition = (Integer) request.get("partition");String key = (String) request.get("key");Object message = request.get("message");kafkaProducerService.sendMessageToPartition(topic, partition, key, message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "消息發送到指定分區成功");response.put("partition", partition.toString());return ResponseEntity.ok(response);}
}

📊 最佳實踐

1. 消息設計

  • 設計合理的消息格式
  • 使用版本化的消息結構
  • 包含必要的元數據信息
  • 考慮消息的向后兼容性

2. 性能優化

  • 合理設置批量大小
  • 使用壓縮減少網絡傳輸
  • 優化序列化方式
  • 合理設置分區數量

3. 可靠性保證

  • 啟用生產者確認機制
  • 實現消費者冪等性
  • 處理重復消息
  • 實現死信隊列機制

4. 監控與運維

  • 監控消息積壓情況
  • 跟蹤消費者延遲
  • 監控集群健康狀態
  • 實現告警機制

本文關鍵詞: Kafka, 消息隊列, 流處理, 分布式系統, 事件驅動, 微服務通信

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91953.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91953.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91953.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Android audio之 AudioDeviceInventory

1. 類介紹 AudioDeviceInventory 是 Android 音頻系統中的一個核心類,位于 frameworks/base/services/core/java/com/android/server/audio/ 路徑下。它負責 管理所有音頻設備的連接狀態,包括設備的添加、移除、狀態更新以及策略應用。 設備連接狀態管理:記錄所有已連接的音…

系統設計入門:成為更優秀的工程師

系統設計入門指南 動機 現在你可以學習如何設計大規模系統&#xff0c;為系統設計面試做準備。本指南包含的是一個有組織的資源集合&#xff0c;旨在幫助你了解如何構建可擴展的系統。 學習設計大規模系統 學習如何設計可擴展系統將幫助你成為更優秀的工程師。系統設計是一個…

Pandas數據分析工具基礎

文章目錄 0. 學習目標 1. Pandas的數據結構分析 1.1 Series - 序列 1.1.1 Series概念 1.1.2 Series類的構造方法 1.1.3 創建Series對象 1.1.3.1 基于列表創建Series對象 1.1.3.2 基于字典創建Series對象 1.1.4 獲取Series對象的數據 1.1.5 Series對象的運算 1.1.6 增刪Series對…

大模型——Qwen開源會寫中文的生圖模型Qwen-Image

Qwen開源會寫中文的生圖模型Qwen-Image 會寫中文,這基本上是開源圖片生成模型的獨一份了。 這次開源的Qwen-Image 的最大賣點是“像素級文字生成”。它能直接在像素空間內完成排版:從小字注腳到整版海報均可清晰呈現,且同時支持英文字母與漢字。 以下圖片均來自官網的生成…

大模型知識庫(1)京東云 JoyAgent介紹

一、核心定位? JoyAgent 是京東云推出的 ?首個 100% 開源的企業級多智能體平臺&#xff0c;定位為“可插拔的智能發動機”&#xff0c;旨在通過開箱即用的產品級能力&#xff0c;降低企業部署智能體的門檻。其特點包括&#xff1a; ?完整開源?&#xff1a;前端&#xff0…

PowerShell 入門2: 使用幫助系統

PowerShell 入門 2&#xff1a;使用幫助系統 &#x1f3af; 一、認識 PowerShell 幫助系統 1. 使用 Get-Help 查看命令說明 Get-Help Get-Service或使用別名&#xff1a; gsv2. 更新幫助系統 Update-Help3. 搜索包含關鍵詞的命令&#xff08;模糊搜索&#xff09; Help *log*&a…

hyper-v實戰系列:顯卡虛擬化(GPU分區)--windows篇詳解

一般來說&#xff0c;windows系統中最常使用的虛擬機就3個&#xff1a;vmware workstation&#xff0c;virtualbox和微軟系統自帶的hyper-v。后面與前兩者最大的區別就是能調用物理顯卡的性能。 我在這篇博文會詳述如何設置windows虛擬機的顯卡虛擬化&#xff0c;并會隨之…

WebGL應用實時云渲染改造后如何與網頁端實現數據通信

WebGL是一種基于OpenGL ES 2.0的Web技術&#xff0c;屬于BS架構&#xff0c;它允許在瀏覽器中渲染交互式3D和2D圖形。 隨著大場景高精度的開發要求深入&#xff0c;對于較高級的 WebGL 應用程序&#xff0c;需要性能更強的系統要求&#xff0c;如仍然維持低端硬件或瀏覽器&…

初始化列表,變量存儲區域和友元變量

前言初始化列表是書寫構造函數的一種方式&#xff0c;某些成員變量之只能通過初始化列表進行初始化。另外學習c不可避免地需要知道什么樣的變量存儲在什么區域當中如棧&#xff0c;堆&#xff0c;靜態區&#xff0c;常量區初始化列表書寫格式書寫上&#xff0c;初始化列表&…

excel插入復選框 親測有效

特別說明 1.開始位置是0 2.\u0052是勾選對號 3.\u25A1是不勾選 4.\u0052長度是1 5.\u25A1長度是1 6.漢字長度是1 7.起止位置不能超過索引位置(比如整體長度是6,截止位置最大填寫5) 示例代碼 package com.zycfc.xz.Util.excel;import org.apache.poi.hssf.usermodel.HSSFRichT…

Mac上優雅簡單地使用Git:從入門到高效工作流

Mac上優雅簡單地使用Git&#xff1a;從入門到高效工作流 本文將帶你解鎖在Mac上優雅使用Git的技巧&#xff0c;結合命令行與圖形工具&#xff0c;讓版本控制變得輕松高效&#xff01; 一、為什么Mac是Git的最佳搭檔&#xff1f; 天生支持Unix命令&#xff1a;Git基于Linux開發…

一文了解SOA的紋波

什么是光譜紋波我們在SOA/RSOA/SLD的ASE&#xff08;放大的自發輻射&#xff09;光譜測試中&#xff0c;經常會觀察到光譜中有周期性的變化&#xff0c;通常我們稱之為紋波。在實際應用中&#xff0c;我們大多不希望這些紋波的存在。添加圖片注釋&#xff0c;不超過 140 字&…

ossutil 使用方法

目錄 ossutil 使用方法 1. &#x1f4e4; 上傳文件/文件夾到 OSS 上傳單個文件&#xff1a; 上傳整個文件夾&#xff08;遞歸&#xff09;&#xff1a; 2. &#x1f4e5; 從 OSS 下載文件/文件夾 下載單個文件&#xff1a; 下載整個文件夾&#xff1a; ossutil 使用方法…

從“多、老、舊”到“4i煥新”:品牌官方商城(小程序/官網/APP···)的范式躍遷與增長再想象

全新升級版本「佛羅倫薩小鎮奧萊GO」商城正式上線&#xff01;會員福利加碼 2025年&#xff0c;品牌官方商城應該如何定義&#xff1f;—— 還是一套“電商貨架”&#xff1f; 在商派看來&#xff0c;現如今“品牌官方商城”則需要重新定義&#xff0c;結合不同品牌企業的業務…

WIN QT libsndfile庫編譯及使用

一、概述 libsndfile庫是一個用 C 語言編寫的開源庫&#xff0c;用于讀取和寫入多種音頻文件格式。 環境&#xff1a;QT5.9.9、cmakegui3.23.0、QT的編譯器是minWG32 二、安裝 1、下載libsndfile源碼&#xff0c;連接&#xff1a;https://github.com/libsndfile/libsndfile…

Supergateway教程

Supergateway 是一款專為 MCP&#xff08;Model Context Protocol&#xff09;服務器設計的遠程調試與集成工具&#xff0c;通過 SSE&#xff08;Server-Sent Events&#xff09;或 WebSocket&#xff08;WS&#xff09;協議實現基于 stdio 的服務器與客戶端的高效通信。 Super…

203.移除鏈表元素 707.設計鏈表 206.反轉鏈表

203.移除鏈表元素 Python鏈表節點定義&#xff1a; class ListNode:def __init__(self, val, nextNone):self.val valself.next next 性能分析 鏈表的特性和數組的特性進行一個對比&#xff0c;如圖所示&#xff1a; 203. 移除鏈表元素 這道題就是給大家一個鏈表&#x…

人工智能之數學基礎:利用全概率公式如何將復雜事件轉為簡單事件

本文重點 全概率公式是概率論中的核心工具,用于計算復雜事件的概率。其核心思想是將復雜事件分解為若干互斥且窮盡的簡單事件,通過計算各簡單事件的概率及其條件概率,最終求得目標事件的概率。 全概率公式 全概率公式就是將復雜事件簡單化,定義如下: 如果隨機事件A1,…

飛算JavaAI深度解析:從入門到對比

目錄 一、飛算JavaAI是什么 二、如何注冊和開始使用 三、使用體驗&#xff1a;它能帶來什么 四、與其他大模型的對比分析 五、總結與展望 隨著人工智能技術的飛速發展&#xff0c;大模型在軟件開發領域的應用越來越廣泛。其中&#xff0c;代碼生成工具作為提升開發效率的利…

Flutter各大主流狀態管理框架技術選型分析及具體使用步驟

技術選型決策樹 #mermaid-svg-m5gUL7Cpx4rYV2BQ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-m5gUL7Cpx4rYV2BQ .error-icon{fill:#552222;}#mermaid-svg-m5gUL7Cpx4rYV2BQ .error-text{fill:#552222;stroke:#552…