JDK-調度線程池

歸檔

  • GitHub: JDK-調度線程池

使用示例

  • https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/jdkapi/juc/thread_pool/TestSchedule.java

JDK 版本

openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)

原理

類結構

  • java.util.concurrent.ScheduledThreadPoolExecutor
/*** 調度線程池 */
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService 
{/*** 構造器 */public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue()); // 使用內部隊列作任務隊列 sign_c_001}}
  • java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
    /*** sign_c_001 延遲任務隊列 */static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16; // 初始隊列大小 16 個private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();}
  • java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
    /*** sign_c_010 調度任務 */private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> // 繼承 sign_c_011 實現 sign_i_011{private final long sequenceNumber;  // 序列號(添加時遞增設置)private volatile long time;         // 觸發時間(執行的時間點),基于納米/*** 重復任務的周期,以納秒為單位。* 正值表示固定速率執行。* 負值表示固定延遲執行。* 值為 0 表示非重復(單次)任務。*/private final long period;// 構造器ScheduledFutureTask(Runnable r, V result, long triggerTime,long period, long sequenceNumber) {super(r, result); // 執行體交給父類(sign_c_011)保存this.time = triggerTime;this.period = period;this.sequenceNumber = sequenceNumber;}}
  • java.util.concurrent.FutureTask
/*** sign_c_011 */
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;     // 狀態private Callable<V> callable;   // 封裝的任務public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}
}
  • Future 相關接口定義
// sign_i_011
// java.util.concurrent.RunnableScheduledFuture
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {/*** 判斷是不是周期任務*/boolean isPeriodic();
}// java.util.concurrent.ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}// java.util.concurrent.Delayed
public interface Delayed extends Comparable<Delayed> {/*** 獲取延時時間,* Comparable 用于排序,延時小的排前面。*/long getDelay(TimeUnit unit);
}
  • java.util.concurrent.ThreadPoolExecutor.Worker
    /*** sign_c_020 工作者(線程) */private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 創建一個線程,自己為執行體 sign_m_030}}

調用鏈

  • java.util.concurrent.ScheduledThreadPoolExecutor
    /*** 創建調度任務 */public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {... // 省略校驗ScheduledFutureTask<Void> sft = // 創建調度任務 sign_c_010 new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period),sequencer.getAndIncrement());RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}// 延遲執行private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task); // 添加到隊列if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart();       // 啟動任務執行者線程 sign_sm_001}}
  • java.util.concurrent.ThreadPoolExecutor
    // sign_sm_001 啟動任務執行者線程 void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);  // 添加工作線程else if (wc == 0)addWorker(null, false);}// 添加工作線程private boolean addWorker(Runnable firstTask, boolean core) {... // 省略其他處理boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);  // 創建工作者 sign_c_020final Thread t = w.thread;if (t != null) {......workers.add(w); // 添加到工作者集合...t.start();          // 啟動線程...}} ... // finallyreturn workerStarted;}// sign_m_022 執行隊列中的任務 final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;... // 省略其他try {while (task != null || (task = getTask()) != null) { // 獲取隊列中的任務w.lock();... // 線程中斷處理try {beforeExecute(wt, task);        // 調用執行前的鉤子函數try {task.run();                 // 執行任務體(相當于定時任務的執行體 sign_m_040)afterExecute(task, null);   // 調用執行后的鉤子函數} ... // catch} finally {task = null;w.completedTasks++; // 完成的任務數 +1w.unlock();}}...} ... // finally}private Runnable getTask() {...for (;;) {... // 省略其他判斷處理try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();   // 默認調用 take(): 獲取任務或限時等待 sign_m_050if (r != null)return r;...} ... // catch }}
  • java.util.concurrent.ThreadPoolExecutor.Worker
        // sign_m_030 執行體 public void run() {runWorker(this);    // 執行隊列中的任務 sign_m_022}
  • java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
        /*** sign_m_040 定時任務執行體 */public void run() {if (!canRunInCurrentRunState(this))cancel(false);else if (!isPeriodic())super.run();else if (super.runAndReset()) {     // 執行任務具體邏輯setNextRunTime();               // 設置任務下次被執行的時間reExecutePeriodic(outerTask);   // 重新添加到(當前線程池的)隊列}}
  • java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
        /*** sign_m_050 從隊列中獲取任務(或限時等待) */public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();  // 沒有任務則無限等待else {long delay = first.getDelay(NANOSECONDS);   // 需要延時的時間,實例 ref: sign_c_010if (delay <= 0L)return finishPoll(first);   // 小于 0 表示不需要延時,直接返回first = null; // don't retain ref while waitingif (leader != null)available.await();  // 相當于有線程在等待延時,就無限等待else {Thread thisThread = Thread.currentThread();leader = thisThread;    // 相當于設置標識(給其他線程判斷是不是要無限等待)try {available.awaitNanos(delay);    // 等待需要延時的時間} finally {if (leader == thisThread)leader = null;  // 相當于清空標識}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}

總結

  • 線程等待是在隊列 take() 方法中處理
    • 等待時延是由 ScheduledFutureTask #getDelay() 進行判斷
    • 而不是通過 DelayQueue 實現,但底層原理一樣
  • 重復延時執行的任務,每次執行完,會重新添加到隊列中

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/36291.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/36291.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/36291.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

邊緣計算VNC智能盒子如何助力HMI設備實現二次開發?

HMI&#xff08;Human-Machine Interface&#xff09;又稱人機界面&#xff0c;是用戶與機器之間交互和通信的媒介。今天帶你了解智能盒子如何助力HMI設備實現二次開發&#xff1f; HMI設備被廣泛應用在工業自動化中&#xff0c;具有顯示設備信息&#xff0c;實時監測&#xf…

python爬蟲--scrapy框架

Scrapy 一 介紹 Scrapy簡介 1.Scrapy是用純Python實現一個為了爬取網站數據、提取結構性數據而編寫的應用框架&#xff0c;用途非常廣泛2.框架的力量&#xff0c;用戶只需要定制開發幾個模塊就可以輕松的實現一個爬蟲&#xff0c;用來抓取網頁內容以及各種圖片&#xff0c;非…

GPT-5對普通人有何影響

這篇文章對ChatGPT的使用方法和提問技巧進行了討論&#xff0c;重點強調了背景信息和具體提問的重要性。文章清晰地傳達了如何提高ChatGPT回答的質量&#xff0c;以及個人在使用ChatGPT時的體會和建議。然而&#xff0c;文章在邏輯組織和表達方面還有一些可以改進的地方&#x…

Spring Boot與分布式事務的最佳實踐

Spring Boot與分布式事務的最佳實踐 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們來探討在Spring Boot應用中如何實現分布式事務的最佳實踐。 什么是…

Android Launcher-----MainThreadInitializedObject介紹

MainThreadInitializedObject 是 Android 開發中用于確保對象在主線程上初始化的一種設計模式 一、用途 MainThreadInitializedObject 通常用于確保那些需要在主線程上創建的對象&#xff08;比如UI組件或依賴于主線程環境的對象&#xff09;能夠安全地進行初始化 二、優點 …

LeetCode.438找到字符串中所有字母異位詞

問題描述 給定兩個字符串s和p&#xff0c;找到s中所有p的 異位詞 的子串&#xff0c;返回這些子串的起始索引。不考慮答案輸出的順序。 異位詞 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 解題思路1 注意&#xff1a;該解題思路是錯誤的&am…

Microsoft VBA Excel 操控 Access資料表和查詢代碼進行搬運操作

問題場景 Run_NoSource_AddressSource_FileDestination_AddressDestination_FileCopy_IndicatorRun_Start_Time1C:\Users\EP\path\to\FileSSS-1.MDBC:\Users\EP\path\to\FileSSC-1.MDBY2C:\Users\EP\path\to\FileSSS-2.MDBC:\Users\EP\path\to\FileSSC-2.MDBY3C:\Users\EP\pat…

NC參照 根據名稱轉換為主鍵值,如部門、人員等參照根據部門名稱、人員名稱獲取對應的主鍵值

NC參照 根據名稱轉換為主鍵值&#xff0c;如部門、人員等參照根據部門名稱、人員名稱獲取對應的主鍵值 private BillCardPanel getEditBillCardPanel() {return getEditor().getBillCardPanel(); }private BillData getEditorBillData() {return this.getEditor().getBillCard…

靜態庫和動態庫

1、編譯過程 1.預處理&#xff1a;解釋并展開源程序當中的所有的預處理指令&#xff0c;此時生成 *.i 文件。 2.編譯&#xff1a;詞法和語法的分析&#xff0c;生成對應硬件平臺的匯編語言文件&#xff0c;此時生成 *.s 文件。 3.匯編&#xff1a;將匯編語言文件翻譯為對應處理…

便攜式煙氣監測儀的應用主要有哪些?

煙氣分析儀是一種用于檢測和分析煙氣中各種成分和污染物含量的儀器&#xff0c;通過采集和處理煙氣樣品&#xff0c;對其中的各種成分進行定量分析。那么&#xff0c;便攜式煙氣監測儀的應用主要有哪些&#xff1f;為方便大家了解&#xff0c;下面就讓小編來為大家簡單介紹一下…

2-2到2-4

計算出所有人的平均年齡&#xff1a; val lines sc.textFile("/root/data/scala/people/page.txt") val count lines.count() val total lines.map(line > line.split(" ")(1)).map(t>t.trim.toInt).collect().reduce((a,b)>ab) val avgAge …

如何防止SQL注入

為了防止SQL注入攻擊&#xff0c;可以采取以下一系列的安全措施&#xff0c;這些措施結合了多篇參考文章中的關鍵信息和方法&#xff1a; 使用參數化查詢或預編譯語句&#xff1a; 這是防止SQL注入的最常見且最有效的方法之一。通過將用戶輸入的數據作為參數傳遞給SQL查詢語句…

[Python]根據文件路徑獲取文件所在目錄、文件名和后綴名

一、簡介 本文介紹了在python中如何根據文件的路徑名字&#xff08;字符串&#xff09;獲取文件所在目錄名、文件名&#xff08;帶后綴&#xff09;、文件名&#xff08;無后綴&#xff09;和文件后綴名。 二、代碼 假設文件路徑為/home/user/temp.txt&#xff0c;使用以下代…

壓縮pdf文件大小的方法,如何壓縮pdf格式的大小

pdf太大怎么壓縮&#xff1f;當你需要通過電子郵件發送一個PDF文件&#xff0c;卻發現文件太大無法成功發出時&#xff0c;這些情況下&#xff0c;我們都需要找到一種方法來壓縮PDF文件&#xff0c;以便更便捷地進行分享和傳輸。PDF文件的大小通常與其中包含的圖片、圖形和文本…

入門JavaWeb之 Response 下載文件

web 服務器接收到客戶端的 http 請求 針對這個請求&#xff0c;分別創建一個代表請求的 HttpServletRequest 對象&#xff0c;代表響應的 HttpServletResponse 對象 獲取客戶端請求過來的參數&#xff1a;HttpServletRequest 給客戶端響應一些信息&#xff1a;HttpServletRe…

數據庫索引失效的11種情況

MySQL中 提高性能 的一個最有效的方式是對數據表 設計合理的索引。索引提供了高效訪問數據的方法&#xff0c;并且加快查詢的速度&#xff0c;因此索引對查詢的速度有著至關重要的影響。使用索引可以 快速地定位 表中的某條記錄&#xff0c;從而提高數據庫査詢的速度&#xff0…

js獲取選中區域(window.getSelection的基本使用)

返回一個 Selection 對象&#xff0c;表示用戶選擇的文本范圍或光標的當前位置。 const selection window.getSelection() 1.toString() //光標選中的文本 const selectedText selection.toString() 2.getRangeAt() //返回一個包含當前選區內容的區域對象。 selection…

數據與文字的表示方法

目錄 1. 數據格式 1. 文本文件格式 2. 二進制文件格式 3. 數據庫格式 4. 壓縮格式 2. 數字機器碼表示 整數表示 浮點數表示 3. 字符與數組的表示方法 1. ASCII&#xff08;美國信息交換標準代碼&#xff09; 2. 擴展ASCII 3. Unicode 4. UTF-8&#xff08;8 位 Uni…

面試相關-接口測試常問的問題

1.為什么要做接口測試 (1)現在大多系統都是前后端分離的項目,前端和后端的進度可能不一樣,那為了盡早的進入測試,前端界面沒有開發完成的情況下,只要后端的接口開發完了,就可以提前做接口測試了; (2)基于安全考慮,只依賴前端進行限制,已經完全不滿足系統的安全性…

Power Pivot——常用DAX 函數

常用DAX 函數 以下這些函數是 DAX 中最常用的一部分&#xff0c;通過熟練掌握這些函數&#xff0c;你可以有效地進行數據分析和建模。 聚合函數 (Aggregation Functions) SUM() 用途&#xff1a;對指定列中的所有數值求和。 語法&#xff1a;SUM() 示例&#xff1a;SUM(Sale…