Java并發中常用同步工具類

為什么80%的碼農都做不了架構師?>>> ??hot3.png

同步工具類可以是任何一個對象,只要它根據其自身的狀態來協調線程控制流。阻塞隊列(BlockingQueue)可以作為同步工具類,其他類型的同步工具類還包括信號量(Semaphore),柵欄(Barrier)以及閉鎖(Latch)。在平臺類庫中還包含其他一些同步工具類的類,如果這些類還無法滿足需要,那么可以創建自己的同步工具類。

閉鎖Latch

閉鎖可以延遲線程的進度直到其到達終止狀態。閉鎖的作用相當于一扇門:在閉鎖到達結束狀態之前,這扇門一直是關著的,并且沒有任何線程能通過,當到達結束狀態時,這扇門會打開并允許所有的線程通過。當閉鎖到達結束狀態后,將不會再改變狀態,因此這扇門將永遠保持打開狀態。閉鎖可以用來確保某些活動直到其他活動都完成后才繼續執行,例如:

  • 確保某個計算在其需要的所有資源都被初始化之后才繼續執行。二元閉鎖(包括兩個狀態)可以用來表示“資源R已經被初始化”,而所有需要R的操作都必須在這個閉鎖上等待。
  • 確保某個服務在其依賴的所有其他服務都已經啟動之后才啟動。每個服務都有一個相關的二元閉鎖。當啟動服務S時,將首先在S以來的其他服務的閉鎖上等待,在所有依賴的服務都啟動后會釋放閉鎖S,這樣其他依賴S的服務才能繼續執行。
  • 等待直到某個操作的所有參與者(例如:在多玩家游戲中的所有玩家)都就緒再繼續執行。在這種情況中,當所有的玩家都準備就緒時,閉鎖將到達結束狀態。

CountDownLatch是一種靈活的閉鎖實現,可以在上述各種情況下使用,它可以使一個或多個線程等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法將遞減計數器,表示有一個事件已經發生了,而await方法等待計數器到達零,這表示所有需要的事件都已經發生。如果計數器的值為非零,那么await會一直阻塞直到計數器為零,或者等待中的線程中斷,或者等待超時。

在下面的TestHarness中給出了閉鎖的兩種常見用法。TestHarness創建一定數量的線程,利用它們并發的執行指定的任務。它使用兩個閉鎖,分別表示“起始門(Start?Gate)”和“結束門(End Gate)”。起始門計數器的初始值為1,而結束們計數器的初始值為工作線程數量。每個工作線程首先要做的就是在啟動門上等待,從而確保所有線程都就緒后才開始執行。而每個線程要做的最后一件事情是調用countDown方法使事件數量減1,這能使主線程高效的等待直到所有工作線程都執行完成,因此可以統計所消耗的時間:

public class TestHarness {public long timeTasks(int nThread, final Runnable task) throws InterruptedException {final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThread);for (int i = 0; i < nThread; i++) {Thread t = new Thread() {public void run() {try {startGate.await();try {task.run();} finally {endGate.countDown();}} catch(InterruptedException ignored) {}}};t.start();}long start = System.nanoTime();startGate.countDown();long end = System.nanoTime();return end - start;}
}

為什么要在TestHarness中使用閉鎖,而不是在線程創建后就立即啟動?或許,我們希望測試n個線程并發執行某個任務時需要的時間。如果在創建線程之后立即啟動它們,那么先啟動的線程將“領先”后啟動的線程,并且活躍線程數量會隨著時間的推移而增加或減少,競爭程度也在不斷發生變化。起始門將使得主線程能夠同時釋放所有的工作線程,而結束門則使主線程能夠等待最后一個線程執行完成,而不是順序的等待每個線程執行完成。

FutureTask

FutureTask也可以用作閉鎖。(FutureTask實現了Future語義,表示一種抽象的可生成結果的計算)。FutureTask表示的計算是通過Callable來實現的,相當于一種可生成結果的Runnable,并且可以處于以下3種狀態:等待運行(Waiting to run),正在運行(Running)和完成運行(Completed)。“執行完成”表示計算的所有可能結束方式,包括正常結束、由于取消而結束和由于異常而結束等。當FutureTask進入完成狀態以后,它會永遠停止在這個狀態上。

Future.get的行為取決于任務的狀態。如果任務已經執行完成,那么get會立即返回結果,否則get將阻塞直到任務進入完成狀態,然后返回結果或者拋出異常。FutureTask將計算結果從執行計算的線程傳遞到獲取這個結果的線程,而FutureTask的規范確保了這種傳遞過程能實現結果的安全發布。

FutureTask在Executor框架中表示異步任務,此外還可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動。下面的程序就使用了FutureTask來執行一個高開銷的計算,并且計算結果將在稍后使用。通過提前啟動計算,可以減少在等待結果時需要的時間:

public class Preloader{private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>(){public ProductInfo call() throws DataLoadException{return loadProductInfo();}});private final Thread thread = new Thread(future);public void start() {thread.start();}public ProductInfo get() throws DataLoadException, InterruptedException{try {return future.get();} catch(ExecutionException e) {Throwable cause = e.getCause();if (cause instance of DataLoadException) {throw (DataLoadException)cause;} else {throw launderThrowable(cause);}}}
}

Preloader創建了一個FutureTask,其中包含從數據庫加載產品信息的任務,以及一個執行運算的線程。由于在構造函數或靜態初始化方法中啟動線程并不是一種好方法,因此提供了一個start方法來啟動線程。當程序雖有需要ProductInfo時,可以調用get方法,如果數據已經加載,那么將返回這些數據,否則將等待加載完成后再返回。

Callable表示的任務可以拋出受檢查的或未受檢查的異常,并且任何代碼都可能拋出一個Error。無論任務代碼拋出什么異常,都會被封裝到一個ExecutionException中,并在Future.get中被重新拋出。這將使調用get的代碼變得復雜,因為它不僅需要處理可能出現的ExecutionException(以及未檢查的CancellationException),而且還由于ExecutionException是作為一個Throwable類返回的,因此處理起來并不容易。

信號量Semaphore

計數信號量(Counting Semaphore)用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。計數信號量還可以用來實現某種資源池,或者對容器施加邊界

Semaphore中管理著一組虛擬的許可(permit),許可的初始數量可通過構造函數來指定。在執行操作時可以首先獲得許可(只要還有剩余的許可),并在使用以后釋放即可。如果沒有許可,那么aquire將阻塞直到有許可(或者直到被中斷或者操作超時)。release方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值為1的Semaphore。二值信號量可以用作互斥體(mutex),并具備不可重入的加鎖語義:誰擁有這個唯一的許可,誰就擁有了互斥鎖。

Semaphore可以用于實現資源池,例如數據庫連接池。我們可以構造一個固定長度的資源池,當池為空時,請求資源將會失敗,但你真正希望看到的行為是阻塞而不是失敗,并且當池非空時解除阻塞。如果將Semaphore的計數值初始化為池的大小,并在從池中獲取一個資源之前先調用aquire方法獲得一個許可,在將資源返回給池之后調用release釋放許可,這樣就實現了固定長度了。

同樣,你可以使用Semaphore將任何一種容器變成有界阻塞容器,如下代碼所示。信號量的計數值會初始化為容器容量的最大值。add操作在向底層容器中添加一個元素之前,首先需要獲得一個許可。如果add操作沒有添加任何元素,那么會立刻釋放許可。同樣remove操作釋放一個許可,使更多的元素能夠添加到容器中。底層的Set實現并不知道關于邊界的任何信息,這是由BoundedHashSet來處理的。

public class BoundedHashSet<T> {private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException {sem.aquire();boolean wasAdded = false;try {wasAdded = set.add(o);return wasAdded;} finally {if (!wasAdded) {sem.release();}}}public boolean remove(Object o) {boolean wasRemoved = set.remove(o);if (wasRemoved) {sem.release();}return wasRemoved;}
}

柵欄Barrier

我們已經看到通過閉鎖來啟動一組相關的操作,或者等待也組相關的操作結束。閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。

柵欄(Barrier)類似于閉鎖,它能阻塞線程直到某個事件發生。柵欄與閉鎖的關鍵區別在于:所有線程必須同時到達柵欄位置,才能繼續執行。閉鎖也用于等待事件,而柵欄也用于等待其他線程。柵欄用于實現一些協議,例如幾個家庭決定在某個地方集合:“所有人6:00在麥當勞碰頭,到了以后要等其他人,之后再討論下一步要做的事情。”

CyclicBarrier可以使一定數量的參與方反復地在柵欄位置匯集,它在并行迭代算法中非常有用:這種算法通常將也一個問題拆分成一系列相互獨立的子問題。當線程到達柵欄位置時,將調用await方法,這個方法將阻塞直到所有線程都到達阻塞位置。如果所有線程都到達了柵欄的位置,那么柵欄將打開,此時所有線程都將被釋放,而柵欄將被重置以便下次使用。如果對await的調用超時,或者await阻塞的線程被中斷,那么柵欄就被認為是打破了,所有阻塞的await調用都將終止并拋出BrokenBarrierException。如果成功地通過柵欄,那么await將為每個線程返回一個唯一的到達索引號,我們可以利用這些索引來“選舉”產生一個領導線程,并在下一次迭代中由該領導線程執行一些特殊的工作。CyclicBarrier還可以使你將一個柵欄參數傳遞給構造函數,這是一個Runnable,當成功通過柵欄時會(在下一個子任務線程中)執行它,但在阻塞線程被釋放之前是不能執行的。

下面的例子中給出了如何通過柵欄來計算細胞的自動化模擬。在把模擬過程并行化時,為每個元素(這里為每個細胞)分配一個獨立的線程是不現實的,也因為這將產生過多的線程,而在協調這些線程上導致的開銷將降低計算性能。合理的做法是,將問題分解成一定數量的子問題,為每個子問題分配一個線程來進行求解,之后再將所有的結果合并起來。CellularAutomata將問題分解為Ncpu(可用的CPU數量)個子問題,并將每個子問題分配給一個線程。在每個步驟中,工作線程都為各自子問題中的所有細胞計算新值。當所有工作線程都到達柵欄時,柵欄會把這些新值交給數據模型。在柵欄的操作執行完成以后,工作線程將開始下一步的計算,包括調用isDone方法來判斷是否需要進行下一次迭代。

public class CellularAutomata {private final Board mainBoard;private final CyclicBarrier barrier;private final Worker[] workers;public CellularAutomata (Board board) {this.mainBoard = board;int count = Runtime.getRuntime().availableProcessors();this.barrier = new CyclicBarrier(count, new Runnable(){public void run() {mainBoard.commitNewValues();}});this.workers = new Worker[count];for (int i = 0; i < count; i++) {workers[i] = new Worker(mainBoard.getSubBoard(count, i));}}private class Worker implements Runnable {private final Board board;public Worker(Board board) {this.board = board;}public void run() {while(!board.hasConverged()) {for (int x = 0; x < board.getMaxX(); x++) {for (int y = 0; y < board.getMaxY(); y++) {board.setNewValue(x, y, computeValue(x, y));}}try {barrier.await();} catch(InterruptedException ex) {return;} catch(BrokenBarrierException ex) {return;}}}}public void start() {for (int i = 0; i < workers.length; i++) {new Thread(workers[i]).start();}mainBoard.waitForConvergence();}
}

另一種形式的柵欄就是Exchanger,它是一種兩方(Two-Party)柵欄,各方在柵欄位置上交換數據。當兩方執行不對稱的操作時,Exchanger會非常有用,例如當一個線程向緩沖區寫入數據,而另一個線程從緩沖區中讀取數據。這些線程可以使用Exchanger來匯合,并將滿的緩沖區與空的緩沖區交換。當兩個線程通過Exchanger交換對象時,這種交換就把這兩個對象安全地發布給另一方。

數據交換的時機取決于應用程序的響應需求。最簡單的方案是,當緩沖區被填滿時,由填充任務進行交換,當緩沖區為空時,由清空任務進行交換。這樣會把需要交換的次數降至最低,但如果新數據的到達率不可預測,那么一些數據的處理過程就將也延遲。另一個方法是,不僅當緩沖被填滿時進行交換,并且當緩沖被填充到一定程度并保持也一定時間也以后,也進行交換。

轉載于:https://my.oschina.net/itblog/blog/775918

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/260267.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/260267.shtml
英文地址,請注明出處:http://en.pswp.cn/news/260267.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux平臺Oracle多個實例啟動說明

環境說明:oracle實例1的SID為orcl(為默認啟動的實例),第二個實例的SID為orcl2 啟動步驟&#xff1a; 1&#xff09;啟動數據庫實例完成后&#xff0c;啟動數據庫監聽服務 #lsnrctl start 2&#xff09;切換到需要啟動的數據庫實例下&#xff0c;如下表示啟動的是orcl數據庫…

RTMP協議發送H.264編碼及AAC編碼的音視頻,實現攝像頭直播

RTMP協議發送H.264編碼及AAC編碼的音視頻&#xff0c;實現攝像頭直播 摘要: RTMP協議發送H.264編碼及AAC編碼的音視頻&#xff0c;實現攝像頭直播  RTMP&#xff08;Real Time Messaging Protocol&#xff09;是專門用來傳輸音視頻數據的流媒體協議&#xff0c;最初由Macrome…

java消息順序執行_Apache Flink:如何并行執行但保持消息順序?

請在下面找到使用側輸出和插槽組進行本地擴展的示例 .package org.example/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regardi…

python的字符串定界符可以使用_使用Template格式化Python字符串的方法

對Python字符串&#xff0c;除了比較老舊的%&#xff0c;以及用來替換掉%的format&#xff0c;及在python 3.6中加入的f這三種格式化方法以外&#xff0c;還有可以使用Template對象來進行格式化。from string import Template&#xff0c;可以導入Template類。實例化Template類…

【ES實戰】ES6.7的tar包離線安裝幫助手冊

Elasticsearch6.7部署幫助手冊 校驗時間&#xff1a;2023年12月19日 文章目錄 Elasticsearch6.7部署幫助手冊安裝前準備安裝包安裝要求鎖定內存,修改最大文件描述符,最大線程數內核參數 部署規劃端口規劃用戶規劃目錄規劃 安裝步驟每個服務器配置JDK配置文件master角色node角色…

jenkins 部署文檔

Jenkins是一個非常出色的持續集成服務器&#xff0c;本文主要介紹在CentOS系統中Jenkins的基本安裝配置方法&#xff0c;供參考。一. 軟件包&#xff1a;1. 下載apache-maven-2.2.1-bin.tarhttp://www.apache.org/dyn/closer.cgi/maven/binaries/apache-maven-2.2.1-bin.tar.gz…

牛人,多看看他們寫的東西

計算機大師 Donald E. Knuth&#xff08;高德納&#xff09; 算法大師&#xff0c;我最崇拜的計算機科學家&#xff0c;沒有之一&#xff01;不認識高爺爺的人別說自己是學計算機的。《The Art of Computer Programming》絕對是計算機科學的圣經。對高爺爺的崇敬&#xff0c;對…

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1)

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1) 在vb里面 等價于ii-1 在C#里面 等價于i-- 是有C#自動轉VB時轉換的轉載于:https://www.cnblogs.com/YaDi/archive/2012/11/08/2759802.html

java快速查找中位數_用QuickSort快速查找中位數(median)

中位數(median)是一個排好序的元素中中間位置的元素&#xff0c;如果元素個數為偶數&#xff0c;則是中間兩個元素的平均值。例如(3,1,5)的中位數是3&#xff0c;而(2,1,3,5)的中位數是2.5。查找中位數屬于SelectionAlgorithms的一種。用快速排序可以做到每次divide之后&#x…

python安裝mysql數據庫_windows10安裝mysql-8.0.13(zip安裝)~Python安裝mysql

windows10安裝mysql-8.0.13(zip安裝)安裝環境說明系統版本&#xff1a;windows10mysql版本&#xff1a;mysql-8.0.13-winx64.zip下載地址&#xff1a;http://mirrors.163.com/mysql/Downloads/MySQL-8.0/mysql-8.0.13-winx64.zip解壓安裝包解壓路徑&#xff1a;D:\develop\soft…

centos 下使用sublime

CentOS 之 Sublime text3 安裝及配置&#xff08;不支持中文輸入&#xff09; sublime text 的界面友好&#xff0c;自動補全功能也不錯。 &#xff08;本來用vimphp_function.txt的形式進行補全的&#xff0c;但是配置后的補全不太滿意&#xff0c;放棄了。 具體參見&#xff…

20121108團隊博客(蘇若)

PS&#xff1a;這本是屬于昨晚的帖子&#xff0c;對不住忠仔。現在補上。 忠仔&#xff0c;終于交給了我一個實實在在的任務&#xff0c;很是欣喜&#xff0c;也很是忐忑&#xff0c;生怕自己不能及時完成任務。 好了&#xff0c;廢話不多說&#xff0c;步入正題。 接下任務【畫…

python 倒排索引 性能_python 實現倒排索引的方法

代碼如下&#xff1a;#encoding:utf-8fin open(1.txt, r)建立正向索引:“文檔1”的ID > 單詞1&#xff1a;出現位置列表&#xff1b;單詞2&#xff1a;出現位置列表&#xff1b;…………“文檔2”的ID > 此文檔出現的關鍵詞列表。forward_index {}for line in fin:line…

pythonnet下載_Python for .NET

Python for .NET 是一個可以讓 Python 程序員近乎無縫的集成 .NET 通用語言環境 CLR 和以及為 .NET 開發者提供一個強大的應用腳本工具。通過這個項目你可在 .NET 中完全使用 Python 來編寫整個應用&#xff0c;使用 .NET 服務和組件。這個包并沒有用 CLR 語言實現一個 Python&…

webService詳解

什么是webService WebService&#xff0c;顧名思義就是基于Web的服務。它使用Web(HTTP)方式&#xff0c;接收和響應外部系統的某種請求。從而實現遠程調用. 1:從WebService的工作模式上理解的話&#xff0c;它跟普通的Web程序&#xff08;比如ASP、JSP等&#xff09;并沒有本…

讀《有人負責,才有質量:寫給在集市中迷失的一代》總結與感想

在大伙都在吹捧“市集”開發軟件的方式的大浪潮下&#xff0c;作者看到了其中的不當之處&#xff0c;發現其中有許多的問題&#xff0c;因此寫下這篇文章給予吹捧“市集”的人一個提醒&#xff0c;甚至警告。 在該文章里&#xff0c;作者認為“市集”里的“農民”不可能建造出和…

php 判斷是否文件,利用PHP判斷文件是否為圖片的方法總結

前言在網頁設計中&#xff0c;如果需要圖片&#xff0c;我們通常拿到的是一個圖片的文件名。僅僅通過文件名是無法判斷該文件是否是一個圖片文件的。或許有的人以為通過后綴名就可以判斷&#xff0c;別忘了文件的后綴名是可以隨便改動的。更何況&#xff0c;在 Linux 系統下是不…

textedit怎么插入數據_還在手動插入Excel交叉空白行?這個小技巧10秒搞定

導讀&#xff1a;前幾天有同學在后臺提問&#xff0c;怎么快速在Excel中隔行插入一行或者多行空白行&#xff0c;其實在早期我們分享的小視頻中有利用過類似的小技巧來制作工資條&#xff0c;今天我們用它來插入空白行。文/ 芒種學院指北針Hello&#xff0c;大家好&#xff0c;…

python制作安裝包(setup.py)

1.制作setup.py from distutils.core import setupsetup(nameMyblog,version1.0,descriptionMy Blog Distribution Utilities,authorlujianxing,author_emaillujianxinglujianxing.com,urlhttp://blog.lujianxing.com,py_modules[foo] ) py_modules 定義 需要打包的模塊名 2.創…

[Ruby]$: 是什么意思?

ruby comes with a set of predefined variables$: default search path (array of paths)其他Ruby特殊變量&#xff1a; $! 最近一次的錯誤信息 $ 錯誤產生的位置 $_ gets最近讀的字符串 $. 解釋器最近讀的行數(line number) $& 最近一次與正則表達式匹配的字符串 $~ 作為…