并發編程-06之Semaphore

在這里插入圖片描述

一 Semaphore入門
1.1 什么是Semaphore
Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。
Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。

什么是pv操作?
PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。
P操作的主要動作是:
①S減1;
②若S減1后仍大于或等于0,則進程繼續執行;
③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
V操作的主要動作是:
①S加1;
②若相加后結果大于0,則進程繼續執行;
③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續執行或轉進程調度。
1.2 Semaphore的常用方法
構造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
● permits 表示許可證的數量(資源數)
● fair 表示公平性,如果這個設為 true 的話,下次執行的線程會是等待最久的線程
常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
● acquire() 表示嘗試獲取許可(獲取不到則阻塞)。
● tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞。
● release() 表示釋放許可。
● int availablePermits():返回此信號量中當前可用的許可證數。
● int getQueueLength():返回正在等待獲取許可證的線程數。
● boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
● void reducePermit(int reduction):減少 reduction 個許可證
● Collection getQueuedThreads():返回所有等待獲取許可證的線程集合
1.3 應用場景
可以用于做流量控制,特別是公用資源有限的應用場景
代碼演示:
模擬一個每5S只能處理5個請求的限流Demo
/**

  • 限流測試

  • @author wcy
    */
    @Slf4j
    public class SemaphoneDemo1 {

    /**

    • 實現一個同時只能處理5個請求的限流器
      */
      private static Semaphore semaphore = new Semaphore(5);

    /**

    • 定義一個線程池
      */
      private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
      10, 50,
      60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));

    /**

    • 模擬執行方法
      */
      public static void exec() {
      try {
      semaphore.acquire(1);
      // 模擬真實方法執行
      log.info(“執行exec方法”);
      Thread.sleep(5000);
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      semaphore.release(1);
      }
      }

    public static void main(String[] args) throws InterruptedException {

     {for (; ; ) {Thread.sleep(100);// 模擬請求以10個/s的速度executor.execute(() -> exec());}}
    

    }
    }
    運行結果:

可以看出,每個周期內只能5個線程執行了方法
二 Semaphore原理
學習Semaphore源碼的時候我們有兩個關注點:

  1. Semaphore的加鎖解鎖(共享鎖)邏輯實現

  2. 線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現
    加鎖邏輯(acquire)
    public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
    }
    這里調用了同步器的acquireSharedInterruptibly(int arg)方法
    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
    先看判斷邏輯tryAcquireShared(arg)方法,這是同步器子類實現的
    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    再看nonfairTryAcquireShared(acquires)方法
    final int nonfairTryAcquireShared(int acquires) {
    for (;😉 {
    //獲取許可證數量
    int available = getState();
    //減去當前線程使用的許可數
    int remaining = available - acquires;
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }
    返回可用的許可數,如果<0,說明沒有可用的許可,就會進入 doAcquireSharedInterruptibly(arg)方法,這個方法也是同步器實現的
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;😉 {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    注意:Node node = addWaiter(Node.SHARED);這是構建隊列的方法,但是和ReentrantLock不同的是,這里參數傳的是Node.SHARED,第一個if邏輯是當同步隊列中第一個線程被喚醒后,會進入這里重新競爭鎖,競爭成功后,做出隊的操作,我們假設這里是第一次構建隊列,先看addWaiter(Node.SHARED)方法
    static final Node SHARED = new Node();
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }
    這里第一個線程入隊,會調用enq方法構建隊列,后來的線程會進入if分支,加入隊列尾部。當前線程Node的nextWaiter=Node.SHARED
    private Node enq(final Node node) {
    for (;😉 {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
    tail仍然為空,通過cas操作,新建一個頭節點,這就是并發的精髓了,通過一個死循環,第二次循環的時候tail不為空,進入else邏輯,把當前線程所在的節點的前驅節點指向前邊的結點,并把當前線程節點設置為尾結點。(這里通過cas保證線程安全問題),至此,我們的隊列構建完成,回到doAcquireSharedInterruptibly方法中,可以看出,如果當前線程節點的前驅節點如果是頭節點是話還會進行一次cas操作去嘗試獲取許可,假設還沒有線程釋放許可,返回負數,進入第二個if邏輯中,有兩個判斷方法,shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()方法
    先看shouldParkAfterFailedAcquire(p, node)方法
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    /
    return true;
    if (ws > 0) {
    /

    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    /
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /

    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don’t park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    /
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }
    先判斷前驅節點的waitStatus是否為-1.如果不是-1,通過cas操作改為-1,返回false,外邊是一個死循環,會第二次進入這個方法,這次判斷為-1,返回true,進入parkAndCheckInterrupt()方法,
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }
    在這里,我們的線程就阻塞著了。
    解鎖邏輯(release):
    public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
    }
    接著看releaseShared方法,這個是由同步器來實現的
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }
    接著看tryReleaseShared,這個是同步器子類實現的,主要目的就是釋放一個資源許可。
    protected final boolean tryReleaseShared(int releases) {
    for (;😉 {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
    throw new Error(“Maximum permit count exceeded”);
    if (compareAndSetState(current, next))
    return true;
    }
    }
    這里釋放鎖后,許可加1,執行doReleaseShared()方法
    doReleaseShared()方法,
    private void doReleaseShared() {
    /

    * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases. This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    /
    for (;😉 {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }
    把頭節點的waitStatus置為0,調用unparkSuccessor方法
    unparkSuccessor方法
    private void unparkSuccessor(Node node) {
    /

    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

     /** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
    

    }
    拿到頭結點的下一個節點,喚醒同步隊列中阻塞的第一個線程,此時又會回到阻塞的地方doAcquireSharedInterruptibly方法中
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;😉 {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    這時候,被阻塞的第一個線程被喚醒,重新進入循環,會進入第一個if中,此時調用tryAcquireShared方法可以拿到一個許可,也就是r>=0,
    然后調用setHeadAndPropagate方法,這就是共享鎖和獨占鎖的區別之一
    setHeadAndPropagate
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //設置新的頭節點
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    //喚醒下一個節點
    doReleaseShared();
    }
    }
    在這里先把當前線程的節點設置為新的頭節點,再次嘗試喚醒下一個節點,這樣有個好處,就是資源釋放得快的話,線程就持續被喚醒,這也就保證了Semaphone可以限流的原因,同時刻,只要有線程釋放資源,其他線程就可以拿到許可進而執行自己的業務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44653.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44653.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44653.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

centos部署jar包

第一步&#xff1a; 將IDEA中的項目打包為jar,將這個jar文件放到centos服務器上的目錄里&#xff0c;我在opt新建api目錄&#xff0c;將jar文件放入&#xff0c;如下圖&#xff1a; 第二步&#xff1a; 將需要讀取的配置文件也放入此目錄(其他目錄也可以&#xff0c;和腳本中…

【筆記】記一次讀寫分離之shardingsphere.datasource導致數據源為空錯誤

錯誤&#xff1a; *************************** APPLICATION FAILED TO START *************************** Description: Failed to configure a DataSource: url attribute is not specified and no embedded datasource could be configured. Reason: Failed to determin…

搭建RAG系統就這么簡單:LangChain|RAG是什么?

RAG是什么 “RAG”&#xff08;Retrieval-Augmented Generation&#xff09;是一種結合了檢索&#xff08;Retrieval&#xff09;和生成&#xff08;Generation&#xff09;的人工智能技術&#xff0c;它在大模型中被需要的原因包括&#xff1a; 知識豐富性&#xff1a; 大模…

探索數據結構與算法的奇妙世界 —— Github開源項目推薦《Hello 算法》

在浩瀚的編程與計算機科學領域中&#xff0c;數據結構與算法無疑是每位開發者攀登技術高峰的必經之路。然而&#xff0c;對于初學者而言&#xff0c;這條路往往布滿了荊棘與挑戰。幸運的是&#xff0c;今天我要向大家推薦一個令人振奮的項目——《Hello Algo》&#xff0c;它正…

ubuntu使用kubeadm搭建k8s集群

一、卸載k8s kubeadm reset -f modprobe -r ipip lsmod rm -rf ~/.kube/# 自己選擇性刪除 坑點哦 rm -rf /etc/kubernetes/ rm -rf /etc/systemd/system/kubelet.service.d rm -rf /etc/systemd/system/kubelet.service rm -rf /usr/bin/kube* rm -rf /etc/cni rm -rf /opt/cn…

C# Winform 自定義事件實戰

在C#的WinForms中&#xff0c;自定義事件是一種強大的工具&#xff0c;它允許你創建自己的事件&#xff0c;從而在特定條件下通知訂閱者。自定義事件通常用于封裝業務邏輯&#xff0c;使代碼更加模塊化和易于維護。下面我將通過一個實戰例子來展示如何在WinForms中創建和使用自…

多線程編程中的條件變量及其優化

本套課在線學習視頻&#xff08;網盤地址&#xff0c;保存到網盤即可免費觀看&#xff09;&#xff1a; 鏈接&#xff1a;https://pan.quark.cn/s/7220b198cf00 在多線程編程中&#xff0c;條件變量是一種用于線程間通信和同步的機制。通過使用條件變量&#xff0c;可以有效地…

Prometheus + alermanager + webhook-dingtalk 告警

添加釘釘機器人 1. 部署 alermanager 1.1 下載軟件包 wget https://github.com/prometheus/alertmanager/releases/download/v0.26.0/alertmanager-0.26.0.linux-amd64.tar.gz 網址 &#xff1a;Releases prometheus/alertmanager (github.com) 1.2 解壓軟件包 mkdir -pv …

醫日健集團技術力量體現測試的背后

醫日健集團覆蓋式更新 科技日新月異的時代&#xff0c;醫日健集團始終走在行業的前列。近日&#xff0c;醫日健集團外勤技術人員全面對市場點位投放的數智藥房進行了新系統升級和機器測試&#xff0c;這是醫日健對于科技創新的最新嘗試。 以客戶體驗為核心優化新體驗 醫日健集團…

NCNN源碼學習(1):Mat詳解

前言:最原始的發行版本代碼比較簡潔,我們從2017年ncnn第一次開源的版本閱讀mat的源碼。閱讀源碼味如嚼蠟,下面就開始吧! 目錄 構造函數 內存分配 數據成員 申請和釋放內存 引用計數 輔助函數 填充函數fill 參考 構造函數 ncnn提供了8種構造函數的方式。 // emptyM…

Js 前置,后置補零的原生方法與補字符串 padStart及padEnd

在工作中&#xff0c;遇到了需要將不滿八位的一個字符串進行后補0的操作&#xff0c;所以就在網上學習了關于js原生補充字符串的方法&#xff0c;然后用這篇博客記錄下來。 目錄 前置補充字符串 String.prototype.padStart() 后置補充字符串String.prototype.padEnd() 前置補…

將獨熱碼應用到神經網絡中

引言 接上回&#xff0c;本文繼續說如何用TensorFlow將獨熱編碼應用到一個簡單的神經網絡中&#xff0c;以實現從一段隨機文本到另一段隨機文本的轉換。 步驟一&#xff1a;導入庫 import tensorflow as tf import numpy as np import random import string步驟二&#xff1…

【超音速 專利 CN117710683A】基于分類模型的輕量級工業圖像關鍵點檢測方法

申請號CN202311601629.7公開號&#xff08;公開&#xff09;CN117710683A申請日2023.11.27申請人&#xff08;公開&#xff09;超音速人工智能科技股份有限公司發明人&#xff08;公開&#xff09;張俊峰(總); 楊培文(總); 沈俊羽; 張小村 技術領域 本發明涉及圖像關鍵點檢測…

數據庫MySQL下載安裝

MySQL下載安裝地址如下&#xff1a; MySQL :: Download MySQL Community Server 1、下載界面 2、點擊下載 3、解壓記住目錄 4、配置my.ini文件 未完..

C語言課程回顧:九、C語言之預處理命令

9 預處理命令 9 預處理命令9.1 概述9.2 宏定義9.2.1 無參宏定義9.2.2 帶參宏定義 9.3 文件包含9.4 條件編譯9.5 本章小結9.6 擴展 10種軟件濾波方法的示例程序1、限副濾波2、中位值濾波法3、算術平均濾波法4、遞推平均濾波法&#xff08;又稱滑動平均濾波法&#xff09;5、中位…

Vue.js學習筆記(五)抽獎組件封裝——轉盤抽獎

基于VUE2轉盤組件的開發 文章目錄 基于VUE2轉盤組件的開發前言一、開發步驟1.組件布局2.布局樣式3.數據準備 二、最后效果總結 前言 因為之前的轉盤功能是圖片做的&#xff0c;每次活動更新都要重做UI和前端&#xff0c;為了解決這一問題進行動態配置轉盤組件開發&#xff0c;…

【jvm】字符串常量池問題

目錄 一、基本概念1.1 說明1.2 特點 二、存放位置2.1 JDK1.6及以前2.2 JDK1.72.3 JDK1.8及以后 三、工作原理3.1 創建字符串常量3.2 使用new關鍵字創建字符串 四、intern()方法4.1 作用 五、優點六、字節碼分析6.1 示例16.1.1 代碼示例6.1.2 字節碼6.1.3 解析 6.2 示例26.2.1 代…

STM32智能倉儲管理系統教程

目錄 引言環境準備晶智能倉儲管理系統基礎代碼實現&#xff1a;實現智能倉儲管理系統 4.1 數據采集模塊 4.2 數據處理與決策模塊 4.3 通信與網絡系統實現 4.4 用戶界面與數據可視化應用場景&#xff1a;倉儲管理與優化問題解決方案與優化收尾與總結 1. 引言 智能倉儲管理系統…

7 月12日學習打卡--棧和隊列的相互轉換

hello大家好呀&#xff0c;本博客目的在于記錄暑假學習打卡&#xff0c;后續會整理成一個專欄&#xff0c;主要打算在暑假學習完數據結構&#xff0c;因此會發一些相關的數據結構實現的博客和一些刷的題&#xff0c;個人學習使用&#xff0c;也希望大家多多支持&#xff0c;有不…

什么是STM32?嵌入式和STM32簡單介紹

1、嵌入式和STM32 1.1.什么是嵌入式 除了桌面PC之外&#xff0c;所有的控制類設備都是嵌入式 嵌入式系統的定義&#xff1a;“用于控制、監視或者輔助操作機器和設備的裝置”。 嵌入式系統是一個控制程序存儲在ROM中的嵌入式處理器控制板&#xff0c;是一種專用的計算機系統。…