自定義線程池-實現任務0丟失的處理策略

設計一個線程池,要求如下:

  1. 隊列最大容量為10(內存隊列)。
  2. 當隊列滿了之后,拒絕策略將新的任務寫入數據庫。
  3. 從隊列中取任務時,若該隊列為空,能夠從數據庫中加載之前被拒絕的任務

模擬數據庫 (TaskDatabase)

  • 使用LinkedBlockingQueue模擬數據庫存儲
  • 線程安全操作(ReentrantLock保證)
  • 實際應用需替換為JDBC/MyBatis等持久化方案
 static class TaskDatabase {private final BlockingQueue<Runnable> dbQueue = new LinkedBlockingQueue<>();private final ReentrantLock dbLock = new ReentrantLock();public void saveTask(Runnable task) {dbLock.lock();try {System.out.println("[DB] 存儲被拒絕任務到數據庫, 當前數據庫任務數: " + (dbQueue.size() + 1));dbQueue.put(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {dbLock.unlock();}}public Runnable loadTask() {dbLock.lock();try {Runnable task = dbQueue.poll();if (task != null) {System.out.println("[DB] 從數據庫加載任務, 剩余數據庫任務: " + dbQueue.size());}return task;} finally {dbLock.unlock();}}}

Runable任務序列化:

public interface SerializableTask extends Serializable {void execute();
}class CustomTask {public static void main(String[] args) {SerializableTask task = () -> System.out.println("Hello, World!");String serializedTask = serializedTask(task);SerializableTask deserialization = deserialization(serializedTask);deserialization.execute();}static String serializedTask(SerializableTask runnable){try(ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos)) {// 序列化任務對象oos.writeObject(runnable);return Base64.getEncoder().encodeToString(baos.toByteArray());}catch (Exception e){throw new RuntimeException("無法序列化");}}static SerializableTask deserialization(String serializedTask){// 反序列化任務byte[] data = Base64.getDecoder().decode(serializedTask);try (ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais)) {SerializableTask task = (SerializableTask) ois.readObject();return task;}catch (Exception e){throw new RuntimeException("無法反序列化");}}
}

自定義阻塞隊列 (DatabaseBackedBlockingQueue)

  • 繼承LinkedBlockingQueue并重寫關鍵方法
  • take()方法邏輯
    • 優先從內存隊列取任務
    • 隊列為空時從數據庫加載
    • 數據庫也為空時阻塞等待新任務
  • offer()方法 :隊列未滿時接受,滿時返回false觸發拒絕策略
 // 自定義阻塞隊列(支持從數據庫加載任務)static class DatabaseBackedBlockingQueue extends LinkedBlockingQueue<Runnable> {private final TaskDatabase database;public DatabaseBackedBlockingQueue(int maxLocalCapacity, TaskDatabase database) {super(maxLocalCapacity);this.database = database;}@Overridepublic Runnable take() throws InterruptedException {// 1. 優先檢查本地隊列Runnable task = super.poll();if (task != null) return task;// 2. 本地隊列為空時嘗試從數據庫加載while (true) {Runnable dbTask = database.loadTask();if (dbTask != null) return dbTask;// 3. 數據庫為空則等待新任務if (isEmpty()) {task = super.take();  // 阻塞直到有新任務if (task != null) return task;}}}}

拒絕策略 (DatabaseRejectionHandler)

  • 實現RejectedExecutionHandler接口
  • 當內存隊列滿時將任務存入數據庫
  • 任務存入后會被后續的take()方法加載執行
 // 自定義拒絕策略(保存到數據庫)static class DatabaseRejectionHandler implements RejectedExecutionHandler {private final TaskDatabase database;public DatabaseRejectionHandler(TaskDatabase database) {this.database = database;}@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("[Rejected] 線程池隊列已滿,任務轉入數據庫");database.saveTask(task);}}

資源管理

  • 核心/最大線程數根據容器資源動態調整
  • 線程工廠添加命名前綴(便于監控)
  • 保活時間控制閑置線程銷毀
// 5. 監控線程池狀態
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {System.out.println("\n[監控] 活躍線程: " + executor.getActiveCount()+ " | 隊列大小: " + executor.getQueue().size()+ " | 總完成任務: " + executor.getCompletedTaskCount());
}, 1, 2, TimeUnit.SECONDS);

自定義線程工廠

   // 自定義線程工廠static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namePrefix + "-" + counter.getAndIncrement());}}

測試

public static void main(String[] args) throws InterruptedException {// 1. 創建任務數據庫TaskDatabase taskDatabase = new TaskDatabase();// 2. 配置線程池int queueCapacity = 10; // 隊列容量// 3. 創建自定義隊列和拒絕策略DatabaseBackedBlockingQueue workQueue =new DatabaseBackedBlockingQueue(queueCapacity, taskDatabase);ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,0,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("custom-pool"),new DatabaseRejectionHandler(taskDatabase));// 4. 模擬任務提交int totalTasks = 50;System.out.println("開始提交任務, 總數: " + totalTasks);CountDownLatch countDownLatch = new CountDownLatch(50);for (int i = 1; i <= totalTasks; i++) {final int taskId = i;executor.execute(() -> {try {System.out.println(Thread.currentThread().getName()+ " 執行任務: " + taskId);Thread.sleep(1000); // 模擬任務執行countDownLatch.countDown();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 5. 監控線程池狀態ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();monitor.scheduleAtFixedRate(() -> {System.out.println("\n[監控] 活躍線程: " + executor.getActiveCount()+ " | 隊列大小: " + executor.getQueue().size()+ " | 總完成任務: " + executor.getCompletedTaskCount());}, 1, 2, TimeUnit.SECONDS);// 6. 等待任務執行完成countDownLatch.await();executor.shutdown();monitor.shutdown();System.out.println("剩余數據庫任務: " + taskDatabase.dbQueue.size());}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87529.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87529.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87529.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【NLP入門系列四】評論文本分類入門案例

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 博主簡介&#xff1a;努力學習的22級本科生一枚 &#x1f31f;?&#xff1b;探索AI算法&#xff0c;C&#xff0c;go語言的世界&#xff1b;在迷茫中尋找光芒…

Ubuntu安裝ClickHouse

注&#xff1a;本文章的ubuntu的版本為&#xff1a;ubuntu-20.04.6-live-server-amd64。 Ubuntu&#xff08;在線版&#xff09; 更新軟件源 sudo apt-get update 安裝apt-transport-https 允許apt工具通過https協議下載軟件包。 sudo apt-get install apt-transport-htt…

C++26 下一代C++標準

C++26 將是繼 C++23 之后的下一個 C++ 標準。這個新標準對 C++ 進行了重大改進,很可能像 C++98、C++11 或 C++20 那樣具有劃時代的意義。 一:C++標準回顧 C++ 已經有 40 多年的歷史了。過去這些年里發生了什么?這里給出一個簡化版的答案,直到即將到來的 C++26。 1. C++9…

【MySQL】十六,MySQL窗口函數

在 MySQL 8.0 及以后版本中&#xff0c;窗口函數&#xff08;Window Functions&#xff09;為數據分析和處理提供了強大的工具。窗口函數允許在查詢結果集上執行計算&#xff0c;而不必使用子查詢或連接&#xff0c;這使得某些類型的計算更加高效和簡潔。 語法結構 function_…

微型氣象儀在城市環境的應用

微型氣象儀憑借其體積小、成本低、部署靈活、數據實時性強等特點&#xff0c;在城市環境中得到廣泛應用&#xff0c;能夠為城市規劃、環境管理、公共安全、居民生活等領域提供精細化氣象數據支持。一、核心應用場景1. 城市微氣候監測與優化熱島效應研究場景&#xff1a;在城市不…

【仿muduo庫實現并發服務器】eventloop模塊

仿muduo庫實現并發服務器一.eventloop模塊1.成員變量std::thread::id _thread_id;//線程IDPoller _poll;int _event_fd;std::vector<Function<Function>> _task;TimerWheel _timer_wheel2.EventLoop構造3.針對eventfd的操作4.針對poller的操作5.針對threadID的操作…

Redis 加鎖、解鎖

Redis 加鎖和解鎖的應用 上代碼 應用調用示例 RedisLockEntity lockEntityYlb RedisLockEntity.builder().lockKey(TradeConstants.HP_APP_AMOUNT_LOCK_PREFIX appUser.getAccount()).value(orderId).build();boolean isLockedYlb false;try {if (redisLock.tryLock(lockE…

在 Windows 上為 WSL 增加 root 賬號密碼并通過 Shell 工具連接

1. 為 WSL 設置 root 用戶密碼 在 Windows 上使用 WSL&#xff08;Windows Subsystem for Linux&#xff09;時&#xff0c;默認情況下并沒有啟用 root 賬號的密碼。為了通過 SSH 或其他工具以 root 身份連接到 WSL&#xff0c;我們需要為 root 用戶設置密碼。 設置 root 密碼步…

2730、找到最長的半重復子字符穿

題目&#xff1a; 解答&#xff1a; 窗口為[left&#xff0c;right]&#xff0c;ans為窗口長度&#xff0c;same為子串長度&#xff0c;窗口滿足題設條件&#xff0c;即只含一個連續重復字符&#xff0c;則更新ans&#xff0c;否則從左邊開始一直彈出&#xff0c;直到滿足條件…

MCP Java SDK源碼分析

MCP Java SDK源碼分析 一、引言 在當今人工智能飛速發展的時代&#xff0c;大型語言模型&#xff08;LLMs&#xff09;如GPT - 4、Claude等展現出了強大的語言理解和生成能力。然而&#xff0c;這些模型面臨著一個核心限制&#xff0c;即無法直接訪問外部世界的數據和工具。M…

[Linux]內核如何對信號進行捕捉

要理解Linux中內核如何對信號進行捕捉&#xff0c;我們需要很多前置知識的理解&#xff1a; 內核態和用戶態的區別CPU指令集權限內核態和用戶態之間的切換 由于文章的側重點不同&#xff0c;上面這些知識我會在這篇文章盡量詳細提及&#xff0c;更詳細內容還得請大家查看這篇…

設計模式-觀察者模式、命令模式

觀察者模式Observer&#xff08;觀察者&#xff09;—對象行為型模式定義&#xff1a;定義了一種一對多的依賴關系,讓多個觀察者對象同時監聽某一主題對象,在它的狀態發生變化時,會通知所有的觀察者.先將 Observer A B C 注冊到 Observable &#xff0c;那么當 Observable 狀態…

【Unity筆記01】基于單例模式的簡單UI框架

單例模式的UIManagerusing System.Collections; using System.Collections.Generic; using UnityEngine;public class UIManager {private static UIManager _instance;public Dictionary<string, string> pathDict;public Dictionary<string, GameObject> prefab…

深入解析 OPC UA:工業自動化與物聯網的關鍵技術

在當今快速發展的工業自動化和物聯網&#xff08;IoT&#xff09;領域&#xff0c;數據的無縫交換和集成變得至關重要。OPC UA&#xff08;Open Platform Communications Unified Architecture&#xff09;作為一種開放的、跨平臺的工業通信協議&#xff0c;正在成為這一領域的…

MCP 協議的未來發展趨勢與學習路徑

MCP 協議的未來發展趨勢 6.1 MCP 技術演進與更新 MCP 協議正在快速發展&#xff0c;不斷引入新的功能和改進。根據 2025 年 3 月 26 日發布的協議規范&#xff0c;MCP 的最新版本已經引入了多項重要更新&#xff1a; 1.HTTP Transport 正式轉正&#xff1a;引入 Streamable …

硬件嵌入式學習路線大總結(一):C語言與linux。內功心法——從入門到精通,徹底打通你的任督二脈!

嵌入式工程師學習路線大總結&#xff08;一&#xff09; 引言&#xff1a;C語言——嵌入式領域的“屠龍寶刀”&#xff01; 兄弟們&#xff0c;如果你想在嵌入式領域闖出一片天地&#xff0c;C語言就是你手里那把最鋒利的“屠龍寶刀”&#xff01;它不像Python那樣優雅&#xf…

MCP server資源網站去哪找?國內MCP服務合集平臺有哪些?

在人工智能飛速發展的今天&#xff0c;AI模型與外部世界的交互變得愈發重要。一個好的工具不僅能提升開發效率&#xff0c;還能激發更多的創意。今天&#xff0c;我要給大家介紹一個寶藏平臺——AIbase&#xff08;<https://mcp.aibase.cn/>&#xff09;&#xff0c;一個…

修改Spatial-MLLM項目,使其專注于無人機航拍視頻的空間理解

修改Spatial-MLLM項目&#xff0c;使其專注于無人機航拍視頻的空間理解。以下是修改方案和關鍵代碼實現&#xff1a; 修改思路 輸入處理&#xff1a;將原項目的視頻文本輸入改為單一無人機航拍視頻/圖像輸入問題生成&#xff1a;自動生成空間理解相關的問題&#xff08;無需用戶…

攻防世界-Reverse-insanity

知識點 1.ELF文件逆向 2.IDApro的使用 3.strings的使用 步驟 方法一&#xff1a;IDA 使用exeinfo打開&#xff0c;發現是32位ELF文件&#xff0c;然后用ida32打開。 找到main函數&#xff0c;然后F5反編譯&#xff0c;得到flag。 tip&#xff1a;該程序是根據隨機函數生成…

【openp2p】 學習1:P2PApp和優秀的go跨平臺項目

P2PApp下面給出一個基于 RESTful 風格的 P2PApp 管理方案示例,供二次開發或 API 對接參考。核心思路就是把每個 P2PApp 當成一個可創建、查詢、修改、啟動/停止、刪除的資源來管理。 一、P2PApp 資源模型 P2PApp:id: string # 唯一標識name: string # …