Java HashMap中的compute及相關方法詳解:從基礎到Kafka Stream應用

H

ashMap是Java集合框架中最常用的數據結構之一,它提供了高效的鍵值對存儲和檢索功能。在Java8中,HashMap引入了一系列新的原子性更新方法,包括compute()computeIfAbsent()computeIfPresent()等,這些方法極大地簡化了在Map中進行復雜更新操作的代碼。本文將詳細介紹這些方法,包括它們的用法、示例和實際應用場景,并特別探討它們在Kafka Stream數據處理中的實際應用。

在這里插入圖片描述

1. compute()方法

方法簽名

default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

功能說明

compute()方法用于根據指定的鍵和其當前映射值(如果沒有當前映射值則為null)計算一個新的映射值。這個方法是原子性的,意味著在多線程環境下可以安全使用。

參數

  • key: 要計算的鍵
  • remappingFunction: 接受鍵和當前值作為參數,返回新值的函數

返回值

  • 返回與鍵關聯的新值,如果沒有值與鍵關聯(且remappingFunction返回null),則返回null

示例

import java.util.HashMap;
import java.util.Map;public class ComputeExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用compute方法增加apple的數量map.compute("apple", (k, v) -> v + 1);System.out.println(map); // 輸出: {apple=2, banana=2}// 對不存在的鍵使用compute方法map.compute("orange", (k, v) -> v == null ? 1 : v + 1);System.out.println(map); // 輸出: {apple=2, banana=2, orange=1}// 使用compute方法刪除條目(返回null)map.compute("banana", (k, v) -> null);System.out.println(map); // 輸出: {apple=2, orange=1}}
}

用途

  • 當需要基于當前值計算新值時(如計數器增加)
  • 當需要根據鍵和當前值決定是否保留、更新或刪除條目時
  • 替代傳統的"檢查是否存在,然后put"模式

2. computeIfAbsent()方法

方法簽名

default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)

功能說明

computeIfAbsent()方法僅在指定的鍵尚未與值關聯(或映射為null)時計算一個新值并將其放入Map中。

參數

  • key: 要檢查的鍵
  • mappingFunction: 接受鍵作為參數,返回新值的函數

返回值

  • 返回與鍵關聯的當前(現有或計算的)值,如果沒有值與鍵關聯,則返回null

示例

import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;public class ComputeIfAbsentExample {public static void main(String[] args) {Map<String, List<String>> map = new HashMap<>();// 使用computeIfAbsent初始化列表map.computeIfAbsent("fruits", k -> new ArrayList<>()).add("apple");map.computeIfAbsent("fruits", k -> new ArrayList<>()).add("banana");map.computeIfAbsent("vegetables", k -> new ArrayList<>()).add("carrot");System.out.println(map); // 輸出: {fruits=[apple, banana], vegetables=[carrot]}// 對已存在的鍵不會重新計算List<String> fruits = map.computeIfAbsent("fruits", k -> new ArrayList<>());fruits.add("orange");System.out.println(map); // 輸出: {fruits=[apple, banana, orange], vegetables=[carrot]}}
}

用途

  • 延遲初始化(如上面的列表示例)
  • 緩存實現(當需要時才計算值)
  • 避免重復計算相同的鍵

3. computeIfPresent()方法

方法簽名

default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

功能說明

computeIfPresent()方法僅在指定的鍵已與值關聯時計算一個新值并將其放入Map中。

參數

  • key: 要檢查的鍵
  • remappingFunction: 接受鍵和當前值作為參數,返回新值的函數

返回值

  • 返回與鍵關聯的新值,如果沒有值與鍵關聯,則返回null

示例

import java.util.HashMap;
import java.util.Map;public class ComputeIfPresentExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用computeIfPresent增加apple的數量map.computeIfPresent("apple", (k, v) -> v + 1);System.out.println(map); // 輸出: {apple=2, banana=2}// 對不存在的鍵使用computeIfPresent不會有任何效果map.computeIfPresent("orange", (k, v) -> v + 1);System.out.println(map); // 輸出: {apple=2, banana=2}// 使用computeIfPresent刪除條目(返回null)map.computeIfPresent("banana", (k, v) -> null);System.out.println(map); // 輸出: {apple=2}}
}

用途

  • 當需要基于現有值更新值時(如計數器增加)
  • 當需要根據條件刪除條目時
  • 替代傳統的"檢查是否存在,然后更新"模式

4. merge()方法

雖然不是嚴格意義上的compute方法,但merge()方法與這些方法功能相似,也值得介紹。

方法簽名

default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction)

功能說明

merge()方法將指定的值與鍵的當前值(如果存在)合并,使用提供的合并函數。如果鍵沒有當前映射,則直接將鍵與指定值關聯。

參數

  • key: 要合并的鍵
  • value: 要合并的值
  • remappingFunction: 接受當前值和指定值作為參數,返回合并后的值的函數

返回值

  • 返回與鍵關聯的新值,如果沒有值與鍵關聯,則返回指定的值

示例

import java.util.HashMap;
import java.util.Map;public class MergeExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用merge方法增加apple的數量map.merge("apple", 1, (oldValue, newValue) -> oldValue + newValue);System.out.println(map); // 輸出: {apple=2, banana=2}// 對不存在的鍵使用merge方法直接添加map.merge("orange", 3, (oldValue, newValue) -> oldValue + newValue);System.out.println(map); // 輸出: {apple=2, banana=2, orange=3}// 使用merge方法刪除條目(合并函數返回null)map.merge("banana", 1, (oldValue, newValue) -> null);System.out.println(map); // 輸出: {apple=2, orange=3}}
}

用途

  • 合并兩個值(如計數器累加)
  • 當需要基于現有值和新值計算新值時
  • 替代傳統的"檢查是否存在,然后合并"模式

5. 方法對比

方法觸發條件參數典型用途
compute()總是執行鍵和BiFunction(鍵,當前值→新值)基于鍵和當前值計算新值
computeIfAbsent()鍵不存在或值為null鍵和Function(鍵→新值)延遲初始化,避免重復計算
computeIfPresent()鍵存在且值不為null鍵和BiFunction(鍵,當前值→新值)基于現有值更新值
merge()總是執行鍵、值和BiFunction(當前值,新值→合并值)合并兩個值

6. 實際應用場景

6.1 緩存實現

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;public class CacheExample {private final Map<String, String> cache = new HashMap<>();public String get(String key, Function<String, String> loader) {return cache.computeIfAbsent(key, loader);}public static void main(String[] args) {CacheExample cache = new CacheExample();String value = cache.get("data", key -> {// 模擬從數據庫加載數據System.out.println("Loading data for " + key);return "Value for " + key;});System.out.println(value);// 再次獲取相同key不會重新加載value = cache.get("data", key -> {System.out.println("This won't be printed");return "New value";});System.out.println(value);}
}

6.2 計數器

import java.util.HashMap;
import java.util.Map;public class CounterExample {public static void main(String[] args) {Map<String, Integer> wordCounts = new HashMap<>();String[] words = {"apple", "banana", "apple", "orange", "banana", "apple"};for (String word : words) {wordCounts.merge(word, 1, Integer::sum);}System.out.println(wordCounts); // 輸出: {orange=1, banana=2, apple=3}}
}

6.3 配置合并

import java.util.HashMap;
import java.util.Map;public class ConfigMergeExample {public static void main(String[] args) {Map<String, String> defaultConfig = new HashMap<>();defaultConfig.put("timeout", "1000");defaultConfig.put("retries", "3");Map<String, String> userConfig = new HashMap<>();userConfig.put("timeout", "2000");// 合并配置,用戶配置優先userConfig.forEach((key, value) -> defaultConfig.merge(key, value, (oldVal, newVal) -> newVal));System.out.println(defaultConfig); // 輸出: {timeout=2000, retries=3}}
}

7. Kafka Stream中的HashMap compute方法應用

Kafka Stream是一個用于構建流處理應用的Java庫,它提供了高級抽象來處理數據流。在Kafka Stream應用中,我們經常需要維護狀態(如計數器、聚合結果等),而HashMap及其compute方法家族非常適合這種場景。

7.1 Kafka Stream狀態存儲基礎

Kafka Stream提供了KeyValueStore接口用于狀態存儲,但底層實現通常基于HashMap或其他高效的數據結構。當我們需要在Kafka Stream應用中維護自定義狀態時,compute方法家族可以發揮巨大作用。

7.2 實時計數器示例

假設我們有一個Kafka Stream應用,需要統計每個產品的購買次數:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class ProductCounterProcessor implements Processor<String, String, String, Long> {private final Map<String, Long> productCounts = new HashMap<>();@Overridepublic void init(ProcessorContext<String, Long> context) {// 初始化代碼}@Overridepublic void process(Record<String, String> record) {String productId = record.key();// 使用compute方法原子性地增加計數器productCounts.compute(productId, (k, v) -> v == null ? 1L : v + 1L);// 可以定期將狀態寫入Kafka狀態存儲或發送到下游// 這里簡化處理,直接轉發結果context.forward(new Record<>(productId, productCounts.get(productId), record.timestamp()));}@Overridepublic void close() {// 清理代碼}
}

在這個例子中,compute()方法確保了即使在高并發環境下,計數器也能正確更新,避免了傳統的"檢查-然后-更新"模式可能導致的競態條件。

7.3 會話窗口聚合

在Kafka Stream中處理會話窗口時,我們經常需要維護會話狀態。computeIfPresent()方法非常適合這種場景:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class SessionAggregatorProcessor implements Processor<String, UserEvent, String, SessionSummary> {private final Map<String, SessionSummary> activeSessions = new HashMap<>();@Overridepublic void init(ProcessorContext<String, SessionSummary> context) {// 初始化代碼}@Overridepublic void process(Record<String, UserEvent> record) {String userId = record.key();UserEvent event = record.value();// 使用computeIfPresent更新現有會話activeSessions.computeIfPresent(userId, (k, session) -> {session.addEvent(event);if (session.isExpired()) {// 會話過期,發送結果并移除context.forward(new Record<>(userId, session.toSummary(), record.timestamp()));return null; // 返回null會刪除該條目}return session;});// 使用computeIfAbsent創建新會話activeSessions.computeIfAbsent(userId, k -> {SessionSummary newSession = new SessionSummary(event);return newSession;});}@Overridepublic void punctuate(long timestamp) {// 定期檢查并關閉過期會話activeSessions.entrySet().removeIf(entry -> {if (entry.getValue().isExpired()) {context.forward(new Record<>(entry.getKey(), entry.getValue().toSummary(), timestamp));return true;}return false;});}@Overridepublic void close() {// 清理代碼}
}

在這個例子中,我們結合使用了computeIfPresent()computeIfAbsent()方法來高效地管理會話狀態,確保會話的正確創建、更新和過期處理。

7.4 窗口化聚合

對于基于時間的窗口聚合,merge()方法特別有用:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class WindowedAggregatorProcessor implements Processor<String, SalesEvent, String, SalesSummary> {private final Map<String, SalesSummary> windowSums = new HashMap<>();@Overridepublic void init(ProcessorContext<String, SalesSummary> context) {// 初始化代碼}@Overridepublic void process(Record<String, SalesEvent> record) {String productId = record.key();SalesEvent event = record.value();// 使用merge方法合并銷售事件到窗口匯總windowSums.merge(productId, new SalesSummary(event), (existingSum, newEvent) -> existingSum.merge(newEvent));// 定期發送窗口匯總結果if (shouldSendWindowResult()) {windowSums.forEach((k, v) -> context.forward(new Record<>(k, v, record.timestamp())));windowSums.clear(); // 清空窗口}}@Overridepublic void close() {// 清理代碼}private boolean shouldSendWindowResult() {// 實現窗口觸發邏輯return false;}
}

在這個例子中,merge()方法簡化了窗口內銷售事件的聚合過程,使我們能夠高效地計算每個產品在當前窗口內的銷售匯總。

8. 性能考慮與Kafka Stream集成

在Kafka Stream應用中使用這些compute方法時,需要注意以下幾點:

  1. 線程安全性:Kafka Stream處理器通常是單線程處理每個分區,因此不需要額外的同步措施。但如果在多線程環境中使用HashMap,應考慮使用ConcurrentHashMap及其原子性方法。
  2. 狀態存儲:對于需要持久化的狀態,Kafka Stream提供了Stores工廠類來創建持久化狀態存儲。這些存儲底層可能使用類似HashMap的結構,但提供了容錯能力。
  3. 內存管理:在處理大規模數據時,要注意HashMap的內存使用情況,避免OOM錯誤。可以考慮使用更高效的數據結構或定期清理過期狀態。
  4. 容錯性:雖然compute方法提供了原子性操作,但在分布式環境中,還需要考慮Kafka Stream提供的檢查點機制來確保狀態的一致性。

9. 總結

Java 8引入的compute()computeIfAbsent()computeIfPresent()merge()等方法極大地增強了HashMap的功能,使開發者能夠以更簡潔、更安全的方式執行復雜的Map更新操作。這些方法特別適合以下場景:

  • 需要基于當前值計算新值時(如計數器增加)
  • 當需要根據鍵和當前值決定是否保留、更新或刪除條目時
  • 延遲初始化或緩存實現
  • 合并值的場景

在Kafka Stream數據處理中,這些方法特別有價值,因為它們:

  1. 簡化了狀態管理代碼
  2. 提供了原子性操作,避免了競態條件
  3. 使實時聚合和計數實現更加簡潔
  4. 與Kafka Stream的處理器API完美配合

掌握這些方法可以顯著提高Kafka Stream應用的開發效率和代碼質量,特別是在需要維護復雜狀態的應用場景中。無論是簡單的計數器還是復雜的會話窗口聚合,這些compute方法都能提供優雅的解決方案。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/93448.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/93448.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/93448.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【php中ssti模板注入講解】

php中場景模板 1. Smarty 使用安全模式來執行不信任的模板,只運行PHP白名單里的函數。 2. Twig 與Smarty類似,不過無法利用該模板的SSTI調用靜函數。 php常見模板入門 Smarty 不使用預先準備好的模板 使用預先準備好的模板 對值進行拼接后使用模板展示 設置在模板中…

Redis學習07-Redis的過期策略

Redis 過期策略 什么是過期策略 Redis 的過期策略用于管理設置了過期時間&#xff08;TTL&#xff09;的鍵&#xff0c;確保在鍵過期后能夠被及時刪除&#xff0c;從而釋放內存 整體策略 Redis 采用的是定期刪除惰性刪除的組合策略 1. 定期刪除 原理&#xff1a;周期性的從過期…

深入解讀c++(命名空間)

目錄 1關于命名空間 1.1是什么 1.2解決了什么問題 2.命名空間的定義 2.2命名空間的嵌套定義 3命名空間的特點 3.1命名空間不會影響生命周期 3.2命名空間只能在全局域里定義&#xff0c;當然嵌套定義時例外。 3.3在不同文件中定義相同名稱的命名空間 4.命名空間的使用 …

ClickHouse高性能實時分析數據庫-高性能的模式設計

告別等待&#xff0c;秒級響應&#xff01;這不只是教程&#xff0c;這是你駕馭PB級數據的超能力&#xff01;我的ClickHouse視頻課&#xff0c;凝練十年實戰精華&#xff0c;從入門到精通&#xff0c;從單機到集群。點開它&#xff0c;讓數據處理速度快到飛起&#xff0c;讓你…

ArkTS懶加載LazyForEach的基本使用

在 ArkTS 的開發中&#xff0c;如果你要渲染一個很長的列表&#xff0c;比如商品列表、評論列表或者朋友圈動態&#xff0c;用傳統的循環結構&#xff08;比如 ForEach&#xff09;很容易導致性能問題&#xff0c;尤其是加載慢、卡頓甚至內存暴漲。 這時候就要用到 懶加載渲染組…

動態規劃:從入門到精通

本文全章節一共一萬七千多字&#xff0c;詳細介紹動態規劃基礎與進階技巧&#xff0c;全篇以代碼為主&#xff0c;認真讀完理解&#xff0c;你對動態規劃的理解一定會有一個質的飛躍。一、動態規劃簡介: 動態規劃&#xff08;Dynamic Programming&#xff0c;簡稱DP&…

八股訓練營 40 天心得:一場結束,也是一場新的開始

八股訓練營 40 天心得&#xff1a;一場結束&#xff0c;也是一場新的開始 感謝卡哥的訓練營組織卡碼筆記&#xff0c;對即將參加秋招的我們幫助了很多&#xff0c;感謝卡哥的開源代碼隨想錄代碼隨想錄 四十天前&#xff0c;我帶著一顆不安卻堅定的心&#xff0c;踏入了這場“…

STM32系統定時器(SysTick)詳解:從原理到實戰的精確延時與任務調度

前言&#xff1a;為什么SysTick是嵌入式開發的"瑞士軍刀"&#xff1f; 在STM32開發中&#xff0c;我們經常需要精確的延時功能&#xff08;如毫秒級延時控制LED閃爍&#xff09;或周期性任務調度&#xff08;如定時采集傳感器數據&#xff09;。實現這些功能的方式有…

【微信小程序】12、生物認證能力

1、生物認證 生物認證 是一種基于個體獨特生理或行為特征進行身份驗證的技術,廣泛應用于安全、金融、醫療等領域。 小程序目前暫時只支持指紋識別認證。 2、查詢支持的生物認證方式 獲取本機支持的 SOTER 生物認證方式&#xff0c;文檔 onLoad(options) {wx.checkIsSuppor…

高級機器學習

機器學習常見方法涉及方法&#xff1a;2.半監督學習3.無監督學習4.度量學習5.遷移學習6.多示例多標記學習7.在線學習8.元學習9.聯邦學習10.強化學習11.概率圖模型獨立同分布獨立指的是&#xff0c;樣本集包括訓練集測試集的任意兩個樣本之間都是不相關的。在表示樣本的特征確定…

Chrome 提示 “此擴展程序不再受支持”(MacOS/Windows)

原因 最新 Chrome 使用 Manifest V3, 并在新版瀏覽器中 停止 V2 支持 處理方法 MacOS 新建一個后綴為 .mobileconfig 的文件, 內容參考 <?xml version"1.0" encoding"UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN&…

C++20協程實戰:高效網絡庫、手機終端、多媒體開發開發指南

基于C++協程和事件循環的網絡庫 以下是基于C++協程和事件循環的網絡庫實例,涵蓋常見場景和功能實現。示例基于libuv、Boost.Asio或自定義事件循環,結合C++20協程(如std::coroutine)或其他協程庫(如cppcoro)實現。 基礎TCP服務器 #include <cppcoro/task.hpp> #in…

數據庫4.0

索引 事務 JDBC~ 目錄 一、MySQL索引 1.0 概述 2.0 相關操作 3.0 注意 4.0 索引背后的原理的理解 二、 事務 1.0 原子性 2.0 隔離性 (1)并發執行 (2) 出現的問題 3.0 使用 三、JDBC編程 1.0 概述 2.0 如何下載驅動包 3.0 jar如何引入到項目之中 4.0 jdbc…

HarmonyOS-ArkUI Web控件基礎鋪墊6--TCP協議- 流量控制算法與擁塞控制算法

HarmonyOS-ArkUI Web控件基礎鋪墊1-HTTP協議-數據包內容-CSDN博客 HarmonyOS-ArkUI Web控件基礎鋪墊2-DNS解析-CSDN博客 HarmonyOS-ArkUI Web控件基礎鋪墊3--TCP協議- 從規則本質到三次握手-CSDN博客 HarmonyOS-ArkUI Web控件基礎鋪墊4--TCP協議- 斷聯-四次揮手解析-CSDN博客…

Dify 從入門到精通(2/100 篇):Dify 的核心組件 —— 從節點到 RAG 管道

Dify 的核心組件&#xff1a;從節點到 RAG 管道 引言 在 Dify 博客系列&#xff1a;從入門到精通&#xff08;100 篇&#xff09; 的第一篇《Dify 究竟是什么&#xff1f;真能開啟低代碼 AI 應用開發的未來&#xff1f;》中&#xff0c;我們全面介紹了 Dify 的定位、核心特點…

在線培訓、遠程示教——醫療器械行業的直播解決方案

文章目錄前言一、醫療器械直播應用的兩大核心場景二、直播平臺在醫療場景中的關鍵技術支持點三、典型功能實現原理總結前言 醫療器械行業對“培訓”和“示教”的專業性要求極高&#xff0c;傳統的線下模式常因時間、空間、人員成本等受限而效率低下。而隨著高清低延遲視頻技術…

Mqttnet的MqttClientTlsOptions.CertificateValidationHandler詳解

MqttClientTlsOptions.CertificateValidationHandler 是 MQTTnet 庫中用于自定義 TLS 證書驗證邏輯的關鍵回調函數。在 MQTT 客戶端與服務器建立 TLS 連接時&#xff0c;該回調允許你覆蓋默認的證書驗證流程&#xff0c;實現自定義的安全策略。核心作用當 MQTT 客戶端通過 TLS …

【圖像噪點消除】——圖像預處理(OpenCV)

目錄 1 均值濾波 2 方框濾波 3 高斯濾波 4 中值濾波 5 雙邊濾波 6 小結 噪聲&#xff1a;圖像中的一些干擾因素。通常是由于圖像采集設備、傳輸信道等因素造成的&#xff0c;表現為圖像中隨機的亮度。常見的噪聲類型有高斯噪聲和椒鹽噪聲。高斯噪聲是一種分布符合正態分布…

Vulnhub napping-1.0.1靶機滲透攻略詳解

一、下載靶機 下載地址&#xff1a;https://download.vulnhub.com/napping/napping-1.0.1.ova 下載好后使用VM打開&#xff0c;將網絡配置模式改為net&#xff0c;防止橋接其他主機干擾&#xff08;橋接Mac地址也可確定主機&#xff09;。 二、發現主機 使用nmap掃描沒有相應…

Kubernetes自動擴容方案

Kubernetes 自動擴容可以概括為 “三層六類”&#xff1a;層級類型觸發維度官方/社區方案一句話說明Pod 級HPACPU / 內存 / 自定義 / 外部指標內置副本數橫向擴縮&#xff0c;最常用VPACPU / 內存社區組件單 Pod 資源豎向擴縮&#xff0c;不改副本數KEDA任意事件&#xff08;隊…