環境準備
確保項目中已引入 Spring Boot、Spring Cloud、Kafka 和 MyBatis 的依賴。以下是一個典型的 Maven 依賴配置:
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Cloud Starter --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter</artifactId></dependency><!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- MyBatis --><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.0</version></dependency><!-- 數據庫驅動(如 MySQL) --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>
</dependencies>
配置 Kafka
在 application.yml
或 application.properties
中配置 Kafka 的相關屬性:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
配置 MyBatis
在 application.yml
中配置 MyBatis 和數據源:
spring:datasource:url: jdbc:mysql://localhost:3306/testusername: rootpassword: passworddriver-class-name: com.mysql.cj.jdbc.Drivermybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.demo.model
創建 Kafka 生產者和消費者
定義一個 Kafka 生產者用于發送消息:
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
定義一個 Kafka 消費者用于接收消息:
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}
創建 MyBatis Mapper 和實體
定義一個實體類:
public class User {private Long id;private String name;private String email;// getters and setters
}
創建 MyBatis Mapper 接口:
@Mapper
public interface UserMapper {@Select("SELECT * FROM users WHERE id = #{id}")User findById(Long id);@Insert("INSERT INTO users(name, email) VALUES(#{name}, #{email})")@Options(useGeneratedKeys = true, keyProperty = "id")void insert(User user);
}
業務邏輯整合
在業務邏輯中整合 Kafka 和 MyBatis,例如在接收到 Kafka 消息后保存到數據庫:
@Service
public class UserService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate KafkaProducerService kafkaProducerService;public void processUser(User user) {userMapper.insert(user);kafkaProducerService.sendMessage("user-topic", "User saved: " + user.getName());}
}
測試
編寫一個簡單的測試 Controller 來驗證整合是否成功:
@RestController
@RequestMapping("/api")
public class TestController {@Autowiredprivate UserService userService;@PostMapping("/user")public String saveUser(@RequestBody User user) {userService.processUser(user);return "User saved successfully!";}
}
注意事項
- 確保 Kafka 服務已啟動并正常運行。
- 確保數據庫已正確配置,并且表結構與實體類匹配。
- 在 Spring Boot 主類上添加
@EnableKafka
注解以啟用 Kafka 支持。
通過以上步驟,可以成功整合 Spring Cloud、Kafka 和 MyBatis,實現消息的發送、接收以及數據庫操作。