多線程代碼案例-2 阻塞隊列

阻塞隊列

通過數據結構的學習,我們都知道了隊列是一種“先進先出”的數據結構。阻塞隊列,是基于普通隊列,做出擴展的一種特殊隊列。

特點

1、線程安全的

2、具有阻塞功能:1、如果針對一個已經滿了的隊列進行入隊列,此時入隊列操作就會阻塞,一直阻塞到隊列不滿(其他線程出隊列元素)之后。2、如果針對一個已經空了的隊列進行出隊列,此時出隊列操作就會阻塞,一直阻塞到隊列不空(其他線程入隊列元素)之后。


基于阻塞隊列我們能夠實現“生產者消費者模型”。

生產者消費者模型

什么是生產者消費者模型呢?

舉個生活中的例子:

包餃子的流程:

1、和面(一般都是一個人(線程),沒辦法多個人(線程)完成)

2、搟餃子皮

3、包餃子(2和3都是可以多個人(線程)完成的)

現在有A、B、C三位老鐵,共同完成包餃子的任務,搟面杖,一般一個家庭中只有一個搟面杖,所以會發生,三個線程同時去競爭這個搟面杖,A老鐵拿到搟面杖搟皮了,B、C就需要阻塞等待,所以,包餃子的流程非常適合多線程的方式來實現,即A和完面,B負責搟皮,A(此時A已經釋放了搟面杖)、C負責包餃子。B搟一個皮,A或者C就能包一個餃子……

這里的分工協作,就構成了生產者消費者模型,搟餃子皮的線程就是生產者(生產餃子皮),搟完一個餃子皮 ,餃子皮數目+1,另外兩個包餃子的線程,就是消費者(消費餃子皮),包完一個餃子,餃子皮數量-1。

中間的桌子就起到了“傳遞餃子皮”的效果,這個桌子的角色就相當于“阻塞隊列”。

假設:搟餃子皮的線程速度非常快,而包餃子的人包的很慢。就會導致桌子上的餃子皮越來越多,一直這樣下去,桌子上的餃子皮就會滿了。此時搟餃子皮的人就得停下來等等,等這兩個包餃子皮的人,用掉一些餃子皮,再接著搟……

反之,搟餃子皮的非常慢,包餃子的人包得非常快,就會導致桌子上的餃子皮所剩無幾,一直這樣下去,桌子上就沒有餃子皮了。此時包餃子的人就得等一等,等搟餃子皮的人搟出餃子皮,再接著包……

上述例子,大概模擬了生產者消費者模型。

作用

1、解耦合

引入生產者消費者模型,就可以更好地做到“解耦合”。

耦合度:代碼中不同的模塊,類函數之間相互依賴,相互關聯的緊密程度。

耦合度低:模塊之間的關聯關系弱,相互影響小。一個模塊的修改不容易影響其他模塊,各個模塊之間可以相對獨立進行開發……

耦合度高:模塊之間的關聯關系強,一個模塊的修改往往會導致其他多個模塊也需要相應的修改,代碼的維護和擴展難度大……

在實際開發中,我們追求的是“高內聚,低耦合”,此時就可以使用阻塞隊列降低耦合度。

?實際開發中,經常會涉及到“分布式系統”:服務器整個功能不是由一個服務器全部完成的,而是每個服務器負責一部分功能,通過服務器之間的網絡通信,最終完成整個功能。

圖示:

上述模型中:A和B、A和C之間的耦合度是比較強的,A代碼中需要涉及到一些和B相關的操作,B的代碼中也涉及到一些和A的操作。同樣,A的代碼中需要涉及和C的操作,C的代碼也涉及到和A的操作。此時,如果B或者A“掛了”,此時對A的影響就很大,A可能也就跟著“掛”了。

此時,就可以引入阻塞隊列來降低A、B、C三者間的耦合度從而降低上述事故發生的概率。

引入阻塞隊列后,A和B、A和C之間就都不是直接交互的了,而是通過隊列在中間進行傳話。此時,A只需要和隊列交互就可以了,同理,B、C中的代碼也是跟隊列交互就可以了。

如果B、C“掛了”,對于A的影響是微乎其微的……假設后續要增加一個D,A的代碼是不用發生任何變化的。

引入生產者消費者模型,降低耦合度之后,也是需要付出一些代價的:需要加機器,引入更多的硬件資源。

1、上述描述的阻塞隊列,并非是簡單的數據結構,而是基于這個數據結構實現的服務器程序,又被部署到單獨的主機上了。我們稱這種為“消息隊列”(message queue,簡稱“mq”)。?

2、整個系統的結構更復雜了,需要維護的服務器更多了。

3、效率問題:引入中間的“阻塞隊列”,請求從A發出到B收到,B返回響應到A都需要花費一定的時間。

2、削峰填谷

三峽水壩,是一項非常厲害的工程。

它的一項功能,就是能使上游的水按照一定的速率向下游排放。

如圖所示:

如果上游的降雨量突然增大,那上游的水就會以一個極快的速度沖向下游,對中下游,造成很大的危害。三峽工程,在中間建立了一個水庫

?有了這個水庫之后,即使上游的水非常湍急,但是在中游也被三峽大壩給攔住了,三峽大壩本身就是一個水庫,可以存儲很多的水,然后,我們就可以進行調控,使得三峽按照一定的速率,往下游防水。上游降雨驟增,三峽大壩就可以關閘蓄水;上游降雨驟減,三峽大壩就可以開閘放水。從而達到削峰填谷的效果。(此處的峰和谷,都不是長時間持續的,而是短時間內出現的)。?

回到開發之中:?

上面是一個分布式系統的大致模型,我們需要考慮的是,當外網的請求突然增多時,即A接收到的請求驟增,此時A的壓力就會變大,但因為A做的工作比較簡單,每個請求消耗的資源是比較少的,但是B和C多久不一定了,他們的壓力也會變大,假設B是用戶服務器,需要從數據庫中找到對應的用戶信息,C是商品服務器,也需要從數據庫中找到對應的商品,還需要進行一些匹配和過濾工作等等

A的抗壓能力比較強,B、C的抗壓能力比較弱(他們需要完成的工作更加復雜,每個請求消耗的資源更多),因此一旦外界的請求出現突發的峰值,就可能導致B、C服務器直接掛掉了……

那當請求多的時候,服務器為什么會掛掉呢?

服務器處理每個請求,都是需要消耗硬件資源的(包括但不限于CPU、內存、網絡帶寬……)即使一個請求消耗的資源比較少,但也無法承受住,同時會有很多的請求,加到一起來,這樣消耗的總資源就多了。上述任何一種硬件資源達到瓶頸,服務器都會掛(用戶發出請求,服務器無響應)

我們就可以使用阻塞隊列/消息隊列來盡量避免突發的高請求導致的服務器過載~~(阻塞隊列:是以數據結構的視角命名。消息隊列:是基于阻塞隊列實現服務器程序的視角命名的)……?

當在A與B、C之間添加一個阻塞隊列之后,因為阻塞隊列的特性,即使外界請求出現峰值,也是由阻塞隊列來承擔峰值的請求,B和C(下游)仍然可以按照之前的速度來獲取請求,這樣就可以有效防止B和C被峰值沖擊導致服務器“掛掉”。

但是當請求太多的時候,接收請求的服務器(即A服務器)也是可能會掛的。請求一直往上增加,A肯定也會有頂不住的時候,此時可以在A的前面再添加一個阻塞隊列,但當請求進一步增加,隊列也是可能掛的(我們可以引入更多的硬件資源,以避免上述情況)。

BolockingQueue的使用

Java標準庫中提供了線程的阻塞隊列的數據結構:

BolockingQueue是一個interface(接口)?,下面有三個具體的實現類:

代碼示例:

public class ThreadDemo28 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);//帶阻塞功能的queue.put("aaa");String elem = queue.take();System.out.println("elem:"+elem);String elem1 = queue.take();//阻塞System.out.println("elem:"+elem);//沒有阻塞的獲取隊首元素的方法}
}

?可以看到此時,打印“aaa”之后,進程并沒有結束,說明在取出第2個元素時發生了阻塞。

注意:使用put和offer一樣都是入隊列,但是put是帶有阻塞功能的,offer是沒有阻塞功能的(隊列滿了則返回false),take方法是用來出隊列的,也是帶有阻塞功能的。但是在阻塞隊列中,并沒有提供帶有阻塞功能的獲取隊首元素的方法。

實現一個BlockingQueue

我們可以基于數組來實現隊列的數據結構(環形隊列)。

環形隊列有兩個引用:head(指向頭)和tail(指向尾)

?每次插入數據的時候,將數據插入tail的位置,然后tail往后走

一直走到數組滿了之后?

?因為我們要實現的是環形隊列,所以要判斷隊列是否為滿,有兩種方法:

1、浪費一個格子,tail至多走到head的前一個位置。

2、引入size變量

代碼實現:

1、成員變量和構造方法?

class MyBlockingQueue2{private String[] elem = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue2(int capcity){elem = new String[capcity];}
}

2、 put方法和take方法

put方法的初始代碼(使用size來判斷隊列是否為滿):

在put方法中判斷是否未滿,是有兩種寫法的,一種就是像我們上面寫的那樣if(tail >= elems.length)讓tail = 0,另一種是tail = tail % elems.length(如果tail < elems.length,此時求余的值,就是tail原來的值,如果tail==elems.length,求余的值就是0).

上述兩種方法都能完成我們的任務,那如何評價某個代碼好還是不好呢?

1、開發效率(代碼是否容易被理解,可讀性高不高)

2、運行效率(代碼的執行速度快不快)

分析上面兩種代碼,從可讀性上看,if語句,只要是個程序員,絕大多數都認識if條件,但不認識%,還是有可能的,尤其是在不同的編程語言中,%的作用還可能不太一樣。從運行效率上看,if是條件跳轉語句(執行速度快),大多數情況下,并不會觸發賦值。但是%,本質上是觸發運算指令,除法運算本身屬于比較低效的指令(CPU更擅長計算+、-),而且第二種代碼是百分之百觸發賦值操作的,運行效率會低一些。

綜上,使用if是更好的方法。

解決線程安全問題——引入鎖?

前面if語句中需要阻塞的代碼先不考慮,后面的代碼全都是針對數組進行寫操作,是線程不安全的,一定要加上鎖。

那么向上面這樣加鎖,就線程安全了嗎??

模擬調度過程,如圖所示:

如果這個此時這個數組只剩下最后一個位置了:?所以我們synchronized需要加在最外面的括號中的,這樣就和加到方法上的本質是一樣的:

使得當前代碼能夠阻塞:

說到阻塞,我們就想到了可以用之前的wait來實現阻塞。 巧了,此處的wait正好在鎖內部,可以使用當前的鎖對象來wait。

光有wait還不夠,還需要其他線程對wait進行喚醒(隊列如果沒有滿,就可以進行喚醒操作了)。那什么時候隊列不滿了呢?出隊成功不就是隊列不滿了嘛~~

此時,我們就可以根據剛才的put方法來實現take方法,并在出隊成功后對put方法中的wait進行喚醒;同理在入隊成功后在put方法中對take方法中的wait進行喚醒。

take方法:

put方法 :

這樣看起來,我們的代碼是不是就大功告成了呢??

并沒有,我們期望的情況是這樣的:

但是,不同線程之間,put和take方法的notify可能會不正確地將錯誤的wait給喚醒。

比如下面這種情況:A線程中,入隊列的喚醒操作,把B線程中入隊列的wait喚醒了。

第一步,兩個put都進入了if語句,且進入了阻塞等待(進入wait后會有一步操作是釋放鎖,所以可能出現兩個線程都能進行put這個操作)。

第二步,另一個線程執行了take方法,喚醒了其中一個put,然后這個put繼續往下添加元素,添加完后又執行了一步notify,將另一個put方法喚醒了(這個put方法本應該繼續阻塞等待的)。

如上面這種情況,就是不符合我們預期的bug,并且好像還很難處理,如果我們使用兩個不同的鎖對象來解決,又無法實現鎖競爭。那我們要如何解決呢?我們可以將if改為while,這意味著wait喚醒之后,需要再判定一次條件。如果再次判斷條件,發現隊列還是滿的,那也就是說在wait等待的過程中,已經有其他線程進行了插入,此時就要繼續阻塞等待。

Java標準庫中也建議,wait要搭配while循環進行使用。

完整代碼:

class MyBlockingQueue2{private String[] elem = null;private int head = 0;private int tail = 0;private int size = 0;private Object locker = new Object();public MyBlockingQueue2(int capcity){elem = new String[capcity];}public void put(String s) throws InterruptedException {synchronized (locker) {while (size >= elem.length) {//隊列滿了//這里需要補充讓隊列阻塞locker.wait();}//將元素入到隊尾elem[tail] = s;tail++;if (tail >= elem.length) {tail = 0;}size++;//入隊成功后進行喚醒locker.notify();}}public String take() throws InterruptedException {String s = null;synchronized (locker) {while (size == 0) {locker.wait();}s = elem[head];head++;if (head >= elem.length) {head = 0;}size--;//出隊成功后進行喚醒locker.notify();}return s;}
}

使用剛才實現的BlockQueue,寫一個簡單的生產者消費者模型

在實際的開發中,生產者消費者模型,往往是多個生產者多個消費者。這里的生產者和消費者往往不僅僅是一個線程,也可能是一個獨立的服務器程序,甚至是一組服務器程序...但最核心的仍然是阻塞隊列,使用 synchronized 和 wiat / notify 達到線程安全和阻塞的目的。

如下代碼,t1是生產者,t2是消費者,我們可以通過sleep來控制他們的生產速度和消費速度。

   public static void main(String[] args) {MyBlockingQueue2 queue2 = new MyBlockingQueue2(1000);Thread t1 = new Thread(()->{int count = 0;while (true){try {System.out.println("生產元素:"+count);queue2.put(String.valueOf(count));Thread.sleep(1000);count++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{while (true){try {String s = queue2.take();System.out.println("消費元素:"+s);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}

運行結果如下:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/81234.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/81234.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/81234.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【筆記】記一次PyCharm的問題反饋

#工作記錄 最近更新至 PyCharm 社區版的最新版本后&#xff0c;我遇到了多個影響使用體驗的問題。令人感到不便的是&#xff0c;一些在舊版本中非常便捷的功能&#xff0c;在新版本中卻變得操作復雜、不夠直觀。過去&#xff0c;我一直通過 PyCharm 內置的故障報告與反饋機制反…

學習以任務為中心的潛動作,隨地采取行動

25年5月來自香港大學、OpenDriveLab 和智元機器人的論文“Learning to Act Anywhere with Task-centric Latent Actions”。 通用機器人應該在各種環境中高效運行。然而&#xff0c;大多數現有方法嚴重依賴于擴展動作標注數據來增強其能力。因此&#xff0c;它們通常局限于單一…

相關行業發展趨勢寫一個爬蟲程序

前兩篇我利用爬蟲進行營銷推廣&#xff0c;并且寫了一個品牌口碑爬蟲的代碼示例。現在根據轉向行業發展趨勢&#xff0c;可能是希望收集數據來分析市場動向、競爭對手動態或者新興技術趨勢。 技術實現方面&#xff0c;需要選擇合適的工具和庫。Python的requests和BeautifulSou…

MapReduce 實現 WordCount

在大數據處理領域&#xff0c;MapReduce 是一種極為重要的編程模型&#xff0c;它可以將大規模的數據處理任務分解為多個并行的子任務&#xff0c;從而高效地處理海量數據。WordCount&#xff08;詞頻統計&#xff09;是 MapReduce 中最經典的示例之一&#xff0c;通過它能很好…

GelSight Mobile 4.1 軟件更新推出先進測量工具與報告功能

GelSight Mobile 4.1 軟件完成版本更新&#xff0c;為表面分析領域帶來重要功能升級。此次更新圍繞測量工具與報告功能優化&#xff0c;助力用戶更高效地量化材料表面特征&#xff0c;減少現場測量時間&#xff0c;優化生產流程。 新型測量工具&#xff0c;增強復雜曲面分析能力…

base64加密為何可以直接找三方網站解密

你問得非常關鍵&#xff1a;**Base64 為什么“加密”后還能被網上輕松“解密”&#xff1f;**這是一個很多人容易誤解的點&#xff0c;下面我來用通俗的語言幫你徹底搞懂。 ? 1. Base64 根本不是加密算法 Base64 只是“編碼&#xff08;Encoding&#xff09;”&#xff0c;不是…

IP地址、端口、TCP介紹、socket介紹、程序中socket管理

1、IP地址&#xff1a;IP 地址就是 標識網絡中設備的一個地址&#xff0c;好比現實生活中的家庭地址。IP 地址的作用是 標識網絡中唯一的一臺設備的&#xff0c;也就是說通過IP地址能夠找到網絡中某臺設備。 2、端口&#xff1a;代表不同的進程,如下圖&#xff1a; 3、socket:…

【計算機網絡】HTTP/1.0,HTTP/1.1,HTTP/2,HTTP/3匯總講解,清晰表格整理面試重點對比

表格匯總 對比維度HTTP/1.0HTTP/1.1HTTP/2HTTP/3傳輸協議TCPTCPTCP/TLS&#xff08;默認加密&#xff09;UDP&#xff08;基于 QUIC 協議&#xff09;連接方式短連接&#xff08;每次請求/響應后斷開&#xff09;引入持久連接&#xff08;Persistent Connection&#xff09;&a…

LLaMA-Factory微調大模型Qwen2.5

1、開始ModelScope社區GPU環境 訓練或微調模型都是非常耗費算力的。如果電腦的配置不高,可使用一些云服務器來做這項工作。如ModelScope(魔搭)社區的GPU環境,目前提供36小時免費運算,足夠微調一個大模型了。 注冊ModelScope(魔搭)社區賬號(可能還要注冊或認證阿里云賬號)…

Python 3.13.3 安裝教程

原文來自&#xff1a;Python 3.13.3 安裝教程 | w3cschool筆記 &#xff08;請勿標記為付費&#xff01;&#xff01;&#xff01;&#xff09; Python 是一種廣泛使用的編程語言&#xff0c;廣泛應用于 Web 開發、科學計算、數據處理、人工智能等領域。Python 3.13.3 作為 P…

sqli-labs靶場29-31關(http參數污染)

目錄 前言 less29&#xff08;單引號http參數污染&#xff09; less30&#xff08;雙引號http參數污染&#xff09; less31(雙引號括號http參數污染) 前言 在JSP中&#xff0c;使用request.getParameter("id")獲取請求參數時&#xff0c;如果存在多個同名參數&a…

npm cross-env工具包介紹(跨平臺環境變量設置工具)

文章目錄 cross-env&#xff1a;跨平臺環境變量設置工具什么是cross-env&#xff1f;為什么需要cross-env&#xff1f;平臺差異帶來的問題 cross-env的工作原理核心功能技術實現 安裝與基本使用安裝步驟基本使用方法運行效果 高級使用技巧設置多個環境變量環境變量傳遞與鏈式命…

mac docker彈窗提示Docker 啟動沒有響應

一、原因分析 這臺筆記電腦是Mac M3操作系統,安裝Docker之后,Docker應用程序一直啟動不起來。 二、解決辦法 sudo rm /Library/PrivilegedHelperTools/com.docker.vmnetd sudo cp /Applications/Docker.app/Contents/Library/LaunchServices/com.docker.vmnetd /Library/Pri…

Golang基礎知識—cond

cond 通常指 sync.Cond&#xff0c;它是標準庫 sync 包中用于實現 條件變量 的同步原語。條件變量在多 goroutine 協作場景中非常有用&#xff0c;尤其在需要根據特定條件協調多個 goroutine 的執行順序時。 sync.Cond 的核心作用 條件變量用于 等待某個條件滿足 或 通知其他等…

MySQL 8.0 OCP 1Z0-908 題目解析(1)

題目001 Choose two. User fwuserlocalhost is registered with the SQL Enterprise Firewall and has been granted privileges for the sakila database. Examine these commands that you executed and the results: mysql> SELECT MODE FROM INFORMATION_SCHEMA.SQL…

【Tools】git使用詳解以及遇到問題匯總

這里寫目錄標題 安裝git安裝 TortoiseGitgit github gitlab, Gitee 區別visual studio中使用gitgit使用步驟git命令git刪除某些歷史提交記錄git找回丟失代碼git上傳文本和二進制和gitignore刪除文件刪不掉的問題 安裝git https://blog.csdn.net/mukes/article/details/1156938…

畫立方體軟件開發筆記 js-pytorch xlsx 導出 excel pnpm安裝

js-pytorch npm install -g pnpm pnpm add js-pytorch 放著&#xff0c;等我把模型訓練好了再用這個對接 xlsx pnpm install xlsx ai寫代碼&#xff0c;一遍就通了 import * as XLSX from "xlsx"; import { linelist } from ./2dviewport.js; function export…

Kotlin并發請求的一些知識記錄

private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit //高階函數回調) {val deferredList mutableListOf<Deferred<MyType?>>()// 設定任務超時時間為12秒&#xff0c;并使用 …

配置VScodePython環境Python was not found;

Python was not found; run without arguments to install from the Microsoft Store, or disable this shortcut from Settings > Manage App Execution Aliases. 候試試重啟電腦。 在卸載重裝python后會出現難以解決的局面&#xff0c;系統變量&#xff0c;命令行&#…

OracleLinux7.9-ssh問題

有套rac環境&#xff0c;db1主機無法ssh db1和db1-priv&#xff0c;可以ssh登錄 db2和db2-priv [rootdb1 ~]# ssh db1 ^C [rootdb1 ~]# ssh db2 Last login: Wed May 14 18:25:19 2025 from db2 [rootdb2 ~]# ssh db2 Last login: Wed May 14 18:25:35 2025 from db1 [rootdb2…