書接上回,第二次作業比較容易解決,我問了ai,讓他對我進行指導,按照它提供的步驟,我完成了本次實驗,接下來我會標注出需要注意的細節,指導大家完成此次任務。
🎯 一、作業目標
?? 目標關鍵詞:
? 搭建 Kafka 實現環境
? 使用 Java 寫一個“網站訂購數據實時統計程序”
? 保證數據可以:實時采集 → 實時消費 → 實時統計
? 提供完整演示文檔、錄屏資料
🚩 二、從頭到尾的執行步驟 & 細節
?準備階段
要點:
第一步:Kafka 官方下載二進制版本
Apache Kafka我下的是3.9.1版本
- 第二步:JDK 必須安裝(推薦 Oracle JDK 8 或 11,版本要和 IDEA 保持一致)我用的是8
要用郵箱注冊一下Oracle賬號,才能下載,不用虛擬機的話用windows版本的installer,下載后要記得目標文件的地址,下一步要用。
Java Downloads | Oracle 中國
- 第三步:配好 JAVA_HOME 和 PATH,必須 java -version 有輸出
Win+r后cmd,檢查你java成沒成,
之后你電腦設置,高級系統設置,配置環境變量
? 配置環境變量(最重要)
1?? 找到 系統環境變量 → Path
2?? 把這個 JDK 的 bin
目錄加進去,比如:
C:\Program Files\Java\jdk-11.0.20\bin
3?? 新建 JAVA_HOME
(可選但最好有):
變量名:JAVA_HOME 變量值:C:\Program Files\Java\jdk-11.0.20
?啟動服務端
第四步:ZooKeeper:
- 先啟動 ZooKeeper,新開一個窗口cmd:
- cd /d D:\kafka
- .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- 正常輸出:看到 binding to port 2181,就 OK。
別關,放后臺即可
第五步:Kafka Broker:
- 新開一個窗口cmd,再啟動 Kafka:
- cd /d D:\kafka
- .\bin\windows\kafka-server-start.bat .\config\server.properties
- 正常輸出:看到 KafkaServer id=0 started,就 OK。
注意:
- ZooKeeper 和 Kafka 都必須一直開著!
- 啟動先后順序是:ZooKeeper → Kafka
? 第六步:創建 Topic
- 再新開一個 cmd:
- cd /d D:\kafka
- .\bin\windows\kafka-topics.bat --create --topic order-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
看到提示 Created topic order-topic. → 成功!
? 測試:命令行生產者 & 消費者(可選但很重要)
- Producer:
- .\bin\windows\kafka-console-producer.bat --topic order-topic --bootstrap-server localhost:9092
光標閃爍時輸入:
hello kafka
- Consumer:
- .\bin\windows\kafka-console-consumer.bat --topic order-topic --bootstrap-server localhost:9092 --from-beginning
Consumer 窗口要能顯示 Producer 發送的消息,證明消息管道通了。
? 用命令行測試 Producer / Consumer(可選但很有用)
這樣能保證:
-
Kafka 能正常收發消息
-
防止你寫 Java 程序時,結果沒出來卻不知道是 Kafka 還是代碼問題
📌 1?? 打開命令行 Producer
還是同樣流程:
cd /d D:\kafka
輸入:
.\bin\windows\kafka-console-producer.bat --topic order-topic --bootstrap-server localhost:9092
這時光標會閃爍,表示你可以直接輸入內容了。
📌 2?? 打開命令行 Consumer
再開一個新的 cmd:
cd /d D:\kafka
輸入:
.\bin\windows\kafka-console-consumer.bat --topic order-topic --bootstrap-server localhost:9092 --from-beginning
📌 3?? 測試
-
在 Producer 窗口輸入:
hello
然后回車
-
在 Consumer 窗口應該立刻顯示:
hello
? 如果收發正常,Kafka 配置一切正常!
? 第七步:iDEA 項目準備
- 創建 Maven 項目(方便自動管理 Kafka 依賴)
- pom.xml:
- <dependencies>
- ? <dependency>
- ??? <groupId>org.apache.kafka</groupId>
- ??? <artifactId>kafka-clients</artifactId>
- ??? <version>3.7.0</version>
- ? </dependency>
- </dependencies>
?? dependencies 必須在 <project> 里,別寫到外面。3.7.0是kafka版本號,你下載什么版本就用什么號碼
- 如果下載依賴慢,就加:
- <repositories>
- ? <repository>
- ??? <id>aliyun</id>
- ??? <url>https://maven.aliyun.com/repository/public</url>
- ? </repository>
- </repositories>
- 一定要點擊 Reload Maven,看 External Libraries 有 kafka-clients。
以下是我的pom文件內容:
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>JavaProducerConsumer</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.1</version></dependency></dependencies></project>
依賴寫進去了之后,保存你的文件,要等它下載好,之后就是代碼了
? 第八步:編寫 Producer
- 在 src/main/java 下新建類 OrderProducer
- Producer 要點:
- Kafka 地址:localhost:9092
- Topic 名:order-topic
- 代碼核心:隨機生成訂單數據,循環發送
- 右鍵 Run,看到:
- Sent: Order_123,456.78
就說明 Producer 已經把消息丟到 Kafka 了。
package com.example.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.Random;public class OrderProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(props);Random random = new Random();while (true) {String orderId = "Order_" + random.nextInt(1000);double amount = 100 + random.nextDouble() * 900;String msg = orderId + "," + amount;producer.send(new ProducerRecord<>("order-topic", orderId, msg));System.out.println("Sent: " + msg);Thread.sleep(1000); // 每秒一單}} }
? 第九步:編寫 Consumer
- 新建 OrderConsumer
- Consumer 要點:
- Kafka 地址:localhost:9092
- Topic 名:order-topic
- 邏輯:循環消費消息、解析金額、累計統計、打印實時結果
- 右鍵 Run,會實時輸出:
- Received: Order_123,456.78
- Total Orders: X, Total Amount: Y
package com.example.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Collections; import java.util.Properties;public class OrderConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("order-topic"));int totalOrders = 0;double totalAmount = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {String[] parts = record.value().split(",");double amount = Double.parseDouble(parts[1]);totalOrders++;totalAmount += amount;System.out.println("Received: " + record.value());System.out.println("Total Orders: " + totalOrders + ", Total Amount: " + totalAmount);}}} }
?第十步:?驗證功能完整
先運行producer,再運行consumer,截圖,或者錄屏,(如果能運行代碼,你知道怎么截圖和錄屏的)因為這就是你做成這個實驗的成果
? Producer 持續發
? Consumer 實時收
? Consumer 實時匯總統計
這里錄個屏,截幾張圖就行
? ?結果整理(交付要點)
🌟 交什么?
- Kafka & ZooKeeper 啟動截圖
- order-topic 創建命令 & 成功提示
- Producer/Consumer 運行效果截圖
- IDEA 項目結構截圖
- 可選:錄屏演示從 Producer 發單 → Consumer 實時統計的整個流程
🌟 文檔要點:
- 簡要寫你怎么搭環境(包括下載、配置、啟動命令)
- 貼關鍵截圖
- 解釋你的 Producer/Consumer 邏輯(可以復制關鍵代碼)
- 結果證明:實時采集、實時消費、實時統計三點都 OK
🌟 錄屏要點(如果老師要求):
- 演示 Kafka 啟動、Topic 創建
- 演示 Producer 發送數據
- 演示 Consumer 實時消費并輸出統計
- 錄制 30 秒左右,聲音可有可無
? 可能遇到的坑
- Kafka 啟動失敗:通常是端口沖突或 ZooKeeper 沒開。
- java 不是內部命令:JDK 沒裝好或 PATH 沒配好。
- IDEA 報 org.apache.kafka 不存在:依賴沒寫對或沒刷新 Maven。
- 消費者收不到消息:Topic 名拼錯,或者 Producer 和 Consumer 連接不同的 Kafka。
🚩 完整要點 = 這 10 步,一步不錯
做完這些,你這份 Kafka 作業就 100% 滿分沒跑
還方便下次復用!
最后,這個不是真的網站的數據來源,作為初學者,任務要達到什么程度,可以再明確一下: