本節內容比較初級,故接著躺平攻略寫
一、官網的下載
1.1 下載解壓
首先,去官網下載jar包,放進linux中,解壓到對應位置。
我的位置放在/WORK/MIDDLEWARE/kafka/4.0
1.2 常見配置
# 每個topic默認的分片數
num.properties=4
# 數據被刪除的時間
log.retention.hours=168
# 文件存儲路徑,注意,這不是日志,而是數據
log.dirs=/WORK/MIDDLEWARE/kafka/4.0/kraft-combined-logs
# 這個地方一定要修改,不然客戶端無法連通
# 這里要寫成ip
advertised.listeners=PLAINTEXT://192.168.0.64:9092,CONTROLLER://192.168.0.64:9093
1.3 自啟動
創建 /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-start.sh /WORK/MIDDLEWARE/kafka/4.0/config/server.properties'
ExecStop=/bin/bash -c 'source /etc/profile && /WORK/MIDDLEWARE/kafka/4.0/bin/kafka-server-stop.sh'
Restart=on-failure[Install]
WantedBy=multi-user.target
啟用
systemctl daemreload
systemctl enable kafka
1.4 創建topic
bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic my-test-topic --bootstrap-server localhost:9092
描述信息展示如下:
Topic: my-test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:Topic: my-test-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:Topic: my-test-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:Topic: my-test-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
畢竟我們是學習環境,搭的單機節點,對于每個分區沒有做副本。生產環境下,注意把副本分配到不同的節點上
使用參數如下:
--replica-assignment "<partition0>:<brokerA>,<brokerB>,…;<partition1>:<brokerC>,<brokerD>,…;…"
#如:
--replica-assignment "0:1,2;1:2,3;2:1,3
解釋一下,':‘前面的是分區的編號;’:'后面是這個分區的數據,分別放到哪個broker下
1.5 安裝kafka-ui
cd /WORK/MIDDLEWARE/kafka
mkdir kafka-ui
cd kafka-ui
vim docker-compose.yml
編輯docker-compose文件
services:kafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestports:- 9100:8080environment:DYNAMIC_CONFIG_ENABLED: 'true'
二、SpringBoot的生產者接入
2.1 pom引用
注意,我這里的indi.zhifa.engine-cloud:common-web-starter是自己寫的庫,便于快速創建web項目,大家可以去 我的碼云 下載
<dependencies><dependency><groupId>indi.zhifa.engine-cloud</groupId><artifactId>common-web-starter</artifactId><version>${zhifa-engine.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.10</version></dependency></dependencies>
2.2 生產者java核心代碼:
service
@Slf4j
@Component
public class KafkaSendDataImpl implements IKafkaSendData {private final KafkaTemplate<String, String> mKafkaTemplate;private final FastJsonConfig mFastJsonConfig;public KafkaSendDataImpl(KafkaTemplate<String, String> pKafkaTemplate,@Qualifier("simple-fastJson-config") FastJsonConfig pFastJsonConfig) {mKafkaTemplate = pKafkaTemplate;mFastJsonConfig = pFastJsonConfig;}@Overridepublic void sendAsync(String topic,KafkaData pKafkaData) {String str = JSON.toJSONString(pKafkaData);try{mKafkaTemplate.send(topic,pKafkaData.getName(),str);}catch (Exception e){log.error("發送kafka時發成錯誤,錯誤信息是"+ e.getMessage());}}
}
controller
@Slf4j
@Validated
@RequiredArgsConstructor
@Tag(name = "生產者")
@ZhiFaRestController
@RequestMapping("/kafka/produce")
public class KafkaProduceController {final IKafkaSendData mKafkaSendData;@PostMapping("/{topic}")public void sendAsync(@PathVariable("topic") String pTopic, @RequestBody KafkaData pKafkaData){mKafkaSendData.sendAsync(pTopic,pKafkaData);}
}
配置:
server:# 服務端口port: 8083springdoc:swagger-ui:path: /swagger-ui.htmltags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: "管理接口"paths-to-match: '/**'packages-to-scan:- indi.zhifa.study2025.test.kafka.producer.controllerzhifa:enum-memo:enabled: trueenum-packages:- indi.zhifa.**.enumsuri: /api/enumweb:enabled: truespring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: zstd#0 的時候,吞吐量最高,不管是否成功#1 leader收到后才響應#-1 要求所有的follow都寫成功#通常iot項目,日志采集等,該值設為0.僅僅用來解耦時,比如訂單處理業務,一般設成all,避免丟失,并且在回調監控。并且會自動開啟冪等性。acks: all# 重試次數retries: 3
我們創建幾條消息,觀察現象:
打開swagger-ui,看到確實有消息數量了
2.3 key的作用
額外解釋一點,發送時,指定消息的key。kafka默認會把同一個key放在一個partition(分區)中。我這里用name做key,可以保證同一個name的消息被順序消費。
三、SpringBoot的消費者接入
消費者非常簡單,這里略寫
3.1 java核心代碼
@Component
public class KafkaConsumerListener {private Map<String,Long> mMsgIdx;public KafkaConsumerListener() {mMsgIdx = new ConcurrentHashMap<>();}@KafkaListener(topics = "my-test-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record) {String key = record.key(); // 獲取消息的 keyString value = record.value(); // 獲取消息的 valueString topic = record.topic(); // 獲取消息的 topicint partition = record.partition(); // 獲取消息的分區long offset = record.offset(); // 獲取消息的偏移量long timestamp = record.timestamp(); // 獲取消息的時間戳// 處理消息(這里我們只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);if(StringUtils.hasText(key)){Long idx = mMsgIdx.get(key);if(idx == null){idx = 0l;}idx = idx + 1;mMsgIdx.put(key, idx);System.out.println(key+"的第"+idx+"個消息");}}
}
3.2 配置
spring:profiles:active: localkafka:bootstrap-servers: 192.168.0.64:9092consumer:group-id: my-group # 消費者組IDauto-offset-reset: earliest # 消費者從頭開始讀取(如果沒有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 設置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer