背景
對幾十萬條用戶歷史存量數據寫入,且存在大對象的基礎上。kafka消費進行消費寫mysql超時。導致上游服務調用時異常,CPU飆高異常。
大對象解釋
大對象的定義與危害
1. 什么是大對象?
-
JVM 內存分配機制:Java 中對象優先分配在 Eden 區,但單個對象超過?
-XX:PretenureSizeThreshold
?閾值(默認與類型相關)時,會直接進入老年代。 -
典型場景:
-
超大數組/集合(如?
byte[10MB]
、List
?存儲萬級元素) -
未分頁的數據庫查詢結果(一次性加載百萬行數據)
-
緩存濫用(緩存未壓縮的圖片/文件)
-
未及時釋放的流處理數據(如未關閉的?
InputStream
)
-
2. 大對象如何引發 CPU 飆升?
-
GC 壓力:
-
頻繁 Full GC:老年代被大對象快速填滿,觸發 STW 的 Full GC,CPU 資源被垃圾回收線程獨占。
-
CMS/G1 并發失敗:并發回收期間老年代空間不足,退化為單線程 Full GC,導致長時間停頓。
-
-
序列化開銷:RPC 調用中,大對象的序列化/反序列化(如 Protobuf、JSON)會顯著消耗 CPU。
-
數據處理瓶頸:遍歷或操作大對象(如排序、轉換)導致 CPU 密集型計算。
二、大對象問題定位技巧
1. 診斷工具
-
內存分析:
-
jmap -histo:live <pid>
?直方圖統計對象分布
-
-
GC 日志:
-
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/path/to/gc.log
-
關注?
Full GC
?頻率和?OldGen
?使用率
-
三、大對象問題規避策略
1. 架構設計優化
-
分頁/分段處理:
// 錯誤:一次性查詢全量數據
List<User> users = userDao.findAll(); // 正確:分頁分批處理
int pageSize = 500;
for (int page = 0; ; page++) {List<User> batch = userDao.findByPage(page, pageSize);if (batch.isEmpty()) break;processBatch(batch);
}
2. 編碼規范
-
避免方法內大對象分配:
// 反例:在頻繁調用的方法中創建大數組
public void process() {byte[] buffer = new byte[10 * 1024 * 1024]; // 10MB 臨時數組// ...
}// 正例:復用對象或使用對象池
private static final ThreadLocal<ByteBuffer> bufferHolder = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1024));
- 及時釋放資源
3. JVM 調優
Kafka消費者Rebalance機制
一、Kafka消費者機制與問題根源
消費者組(Consumer Group):多個消費者共同消費一個Topic的分區,實現負載均衡。每個分區僅由一個消費者處理。
Rebalance觸發條件:
1.消費者加入或離開組(如宕機、主動下線)。
2.消費者超過?
max.poll.interval.ms
?未發送心跳(默認5分鐘)。
問題現象:偏移量(Offset)未提交,消費者被判定為死亡,觸發Rebalance,消息重新分配給其他消費者,但新消費者同樣無法及時處理,形成惡性循環。
核心配置參數:
max.poll.records
:單次Poll拉取的最大消息數(默認500)
max.poll.interval.ms
:兩次Poll操作的最大允許間隔(默認5分鐘)
問題原因:處理500條消息耗時超過5分鐘,導致消費者被認為失效,觸發Rebalance,消息被重復分配但處理仍超時,最終服務崩潰。
二、處理邏輯與性能瓶頸
數據處理耗時分析
-
業務邏輯復雜度:每條消息需查詢歷史數據18萬條,涉及復雜計算或多次數據庫交互。
-
數據庫寫入瓶頸:
單條插入 vs 批量插入:單條插入導致頻繁事務提交,效率低下。
索引與鎖競爭:寫入時索引維護和行鎖可能引發性能下降。
-
代碼示例(低效寫入):
@KafkaListener(topics = "init_data_topic")
public void handleMessage(List<Message> messages) {for (Message msg : messages) {// 逐條查詢18萬條歷史數據List<HistoryData> data = queryHugeData(msg.getUserId());// 逐條寫入MySQLdata.forEach(d -> jdbcTemplate.update("INSERT INTO table ...", d));}
}
資源消耗與CPU飆升
-
GC壓力:頻繁創建大對象(如18萬條數據的List)導致Young GC頻繁,最終引發Full GC,CPU被GC線程占用。
-
線程阻塞:同步寫入數據庫時,線程因IO等待而阻塞,線程池滿載后任務堆積,進一步加劇延遲。
三、排查方向與優化策略
1.Kafka消費者配置調優
2.數據處理邏輯優化
分頁查詢與批量寫入:
將18萬條歷史數據分頁查詢,避免一次性加載到內存。
使用MySQL批量插入(
INSERT INTO ... VALUES (...), (...)
),減少事務開銷。
// 分頁查詢示例
int pageSize = 1000;
for (int page = 0; ; page++) {List<HistoryData> batch = queryByPage(msg.getUserId(), page, pageSize);if (batch.isEmpty()) break;batchInsertToMySQL(batch); // 批量寫入
}
總結
總而言之,對于大對象或者數據量過大的數據,每次查和寫入的數據量都要嚴格把控!!單次查詢和寫入數據量不超過1000!!!!!