Go語言之路————并發
- 前言
- 協程
- 管道
- Select
- sync
- WaitGroup
- 鎖
前言
- 我是一名多年Java開發人員,因為工作需要現在要學習go語言,Go語言之路是一個系列,記錄著我從0開始接觸Go,到后面能正常完成工作上的業務開發的過程,如果你也是個小白或者轉Go語言的,希望我這篇文章對你有所幫助。
- 有關go其他基礎的內容的文章大家可以查看我的主頁,接下來主要就是把這個系列更完,更完之后我會在每篇文章中掛上連接,方便大家跳轉和復習。
協程
在學go之前,大家肯定聽說過go底層天然支持并發,相信這也是很多人選擇學習這款語言的原因之一,那么它到底怎么個天然法,怎么個支持,下面我就一一道來。
Goroutine(輕量級線程),正如標題一樣,它也叫做協程,它是go的并發執行單元,是一種比線程更加輕量級的單位,創建一個協程非常簡單,只需要用到一個關鍵詞:go,go后面一定要更一個函數:
func main() {go func() {fmt.Print(1)}()
}
我這里用一個go啟動一個匿名函數,如果你copy這個代碼去執行,你會發現控制臺沒有任何打印,因為協程就跟Java的線程一樣,它是并發去執行的,當我們的main方法跑完的時候,如果協程未執行,那么 整個程序都會關掉,就沒有任何輸出了。
那怎樣讓它正常輸出呢?聰明的同學肯定會想到,讓main線程沉睡一下不就行了,我們來看看代碼:
func main() {go func() {fmt.Print(1)}()time.Sleep(1 * time.Second)
}控制臺打印:1
由此可見,讓主線程沉睡確實可以做到這點,那么我就要提出下一個問題了,如果有多個協程呢?看看下面代碼:
func main() {for i := 0; i < 10; i++ {go fmt.Println(i)}time.Sleep(1 * time.Second)
}
當把這段代碼執行后,你會發現每次執行的結果都是不一樣的,這也引出了協程的一個特性,那就是執行的時候是無序的,那有啥方法解決嗎,我們先用上面的sleep看能否解決:
每次執行協程前,我們都讓它沉睡一秒,然后主線程沉睡十秒
func main() {for i := 0; i < 10; i++ {time.Sleep(1 * time.Second)go fmt.Println(i)}time.Sleep(10 * time.Second)
}
執行后的結果:
0
1
2
3
4
5
6
7
8
9
目前來看,是做到了,但是這個方法太笨了,有啥辦法可以優雅的解決嗎,當然,go提供了管道、信號量、上下文、鎖等各種工具來輔助開發者進行并發編程。
管道
管道:channel,官方對它的解釋:Do not communicate by sharing memory; instead, share memory by communicating.
我用白話文在翻譯一次:它的作用就是解決協程之間的通信的,數據傳輸或者共享的。
一個通道,用chan來定義,定義的時候必須要指定它存的數據類型:
var ch chan int
此時的管道還沒初始化,是不能使用的,在go中,初始化一個管道,有且只有一個辦法,那就是make關鍵詞,make關鍵詞提供一個額外參數:緩沖區
var ch = make(chan int, 1)
這里就是用make創建了一個緩沖區為1的管道,先看看使用:
func main() {var ch = make(chan int, 1)ch <- 1println(<-ch)
}
輸出:1
結合例子,說一下管道的輸出和輸出:<-,沒錯就是用箭頭表示,箭頭的指向表示數據流向,a <- 1,表示把1發到a,<- a,表示從a讀取數據
如何理解緩沖區:可以理解為Java中線程池中的阻塞隊列,往管道中發送的數據會先存到緩沖區,然后才會被讀取,如果一個管道沒有緩沖區,那么發送信息后需要立馬有讀取的操作,否則程序就會阻塞,我們通過下面例子來看:
func main() {var ch = make(chan int)ch <- 1<-ch
}
我們創建一個沒有緩沖區的管道,像管道里面輸入1,馬上再讀取。看似人畜無害的代碼,執行起來確是這個結果:deadlock
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:
main.main()D:/goland/workspace/test/main.go:5 +0x2dProcess finished with the exit code 2
那讀者又會想了,既然這樣,那豈不是所有的管道創建都需要緩沖區。其實不然,如果我們通過協程去輸入就能正常輸出:
func main() {var ch = make(chan int)go func() {ch <- 1}()println(<-ch)
}
輸出:1
思考:為啥有協程的參與就能正常讀寫?我們回到緩沖區的本質,它是存數據的緩沖的,如果我們沒有緩沖區,那么證明這個管道是沒辦法存數據的,就意味著,我這邊寫了,必須馬上有人讀,但是通過同步操作是實現不了的,有協程異步來操作才可行。
注意:每個管道用完后需要我們手段關閉,直接調用系統提供的close方法,一個管道只能close一次,多次close會報錯。
func close(c chan<- Type)
但是通常,我們建議把通道的關閉結合defer來用:
func main() {var ch = make(chan int)go func() {ch <- 1defer close(ch)}()println(<-ch)
}
注意點,除了同步讀寫無緩沖管道會造成堵塞之外,下面幾種情況也會造成deadlock:
- 緩沖區滿了繼續噻數據:
緩沖區大小為1,寫入一個后滿了沒讀,繼續寫func main() {var ch = make(chan int, 1)defer close(ch)ch <- 1ch <- 1println(<-ch) }
- 有緩沖區,但是數據為空
func main() {// 創建的有緩沖管道intCh := make(chan int, 1)defer close(intCh)// 緩沖區為空,阻塞等待其他協程寫入數據<-intCh }
- 管道未初始化
func main() {var intCh chan intintCh <- 1 }
管道數據除了一個個讀之外,我們還可以用for range來遍歷一個管道:
func main() {intCh := make(chan int, 10)go func() {for i := 0; i < 10; i++ {intCh <- i}}()for ch := range intCh {println(ch)}
}
看看輸出:
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
main.main()D:/goland/workspace/test/main.go:10 +0xa8
在輸出之后出現了阻塞,這是因為for range會一直去讀寫管道中的數據,當管道中數據為空時就會死鎖,直到有其他協程向管道寫入數據才會解除,所以我們代碼改一下,在寫入數據完畢后就關閉管道:
func main() {intCh := make(chan int, 10)go func() {for i := 0; i < 10; i++ {intCh <- i}close(intCh)}()for ch := range intCh {println(ch)}
}
最后再補充一個知識點,管道的讀取其實是有返回值的:
v, ok := <-intCh
第一個是值,第二個是個bool代表是否讀取成功:
func main() {intCh := make(chan int, 10)go func() {intCh <- 1}()a, ok := <-intChprintln(a, ok)
}輸出:1 true
Select
在 Go 中,select 是一種管道多路復用的控制結構,某一時刻,同時監測多個元素是否可用,在這里我們可以用來檢測多個管道:
func main() {ch1 := make(chan int, 10)ch2 := make(chan int, 10)ch3 := make(chan int, 10)defer func() {close(ch1)close(ch2)close(ch3)}()select {case i := <-ch1:fmt.Println("ch1 is ", i)case j := <-ch2:fmt.Println("ch2 is ", j)case k := <-ch3:fmt.Println("ch3 is ", k)default:fmt.Print("檢測失敗")}
}
創建三個管道,然后用select分別去監測三個管道的數據,然后doSomething,讓我們沒有往管道輸入任何數據的時候,默認輸出檢測失敗,我們在select前往ch1輸入一個數據看看:
func main() {ch1 := make(chan int, 10)ch2 := make(chan int, 10)ch3 := make(chan int, 10)defer func() {close(ch1)close(ch2)close(ch3)}()ch1 <- 1select {case i := <-ch1:fmt.Println("ch1 is ", i)case j := <-ch2:fmt.Println("ch2 is ", j)case k := <-ch3:fmt.Println("ch3 is ", k)default:fmt.Print("檢測失敗")}
}輸出:ch1 is 1
sync
講到了并發,怎么能離開鎖,go的sync包下面提供了很多鎖相關的工具類,就類似于Java的juc包,我們下面簡單說點常用的。
WaitGroup
WaitGroup 即等待執行,它的方法只有三個,使用起來也非常簡單:
- Add:添加一個計數器,表示總數
- Done:每調用一次計數器減1
- Wait:如果計數器不為0,則等待
還記得我們文章開頭提到的例子嗎,就是在main線程中使用了協程,協程還未執行但是main已經結束了,當時我們用的是sleep方法,現在我們看看怎么用WaitGroup去解決這個問題:
先看看原例子:
func main() {println("start")go func() {println("doSomething")}()println("end")
}
再看看解決后的:
var waitGroup sync.WaitGroupfunc main() {println("start")waitGroup.Add(1)go func() {println("doSomething")waitGroup.Done()}()waitGroup.Wait()println("end")
}看看輸出:
start
doSomething
end
鎖
go中常用的鎖有兩個:
- 互斥鎖:sync.Mutex
- 讀寫鎖:sync.RWMutex
互斥鎖sync.Mutex ,實現了Locker 接口,它的用法非常簡單,就三個:
func (m *Mutex) Lock() {m.mu.Lock()
}func (m *Mutex) TryLock() bool {return m.mu.TryLock()
}func (m *Mutex) Unlock() {m.mu.Unlock()
}
我們先來看看互斥鎖Mutex,下面我來模擬一個經典的場景,就是不同線程對共享數據操作,讓我們看看不用鎖的情況下,會不會得到正確結果:
var wait sync.WaitGroup
var count = 0func main() {wait.Add(10)for i := 0; i < 10; i++ {go func(data *int) {// 模擬訪問耗時time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))// 訪問數據,這里必須要用temp當前數據存起來temp := *data// 模擬計算耗時time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))// 修改數據*data = temp + 1fmt.Println(*data)wait.Done()}(&count)}wait.Wait()fmt.Println("最終結果", count)
}
運行起來發現,每次的輸出都不一樣,跟Java一樣,多線程對共享數據的修改是不安全的,必須要加鎖
1
1
2
1
1
1
1
1
1
3
最終結果 3
下面我們改進一下代碼,將同步代碼用互斥鎖包起來,類似于Java的同步代碼塊:
var lock sync.Mutex
var wait sync.WaitGroup
var count = 0func main() {wait.Add(10)for i := 0; i < 10; i++ {go func(data *int) {lock.Lock()// 模擬訪問耗時time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))// 訪問數據temp := *data// 模擬計算耗時time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))// 修改數據*data = temp + 1lock.Unlock()fmt.Println(*data)wait.Done()}(&count)}wait.Wait()fmt.Println("最終結果", count)
}
go的互斥鎖很簡單,用的時候就調用lock()方法,解鎖就調用unlock()方法,看看輸出:
1
2
3
4
5
6
7
8
9
10
最終結果 10Process finished with the exit code 0
讀寫鎖和互斥鎖一樣,只是說讀寫鎖的精度更高一點,可以根據讀多寫少,或者讀少寫多的情況來判斷,它同樣實現了Locker接口,只是方法多一些,讀寫鎖內部的讀和寫是互斥鎖,并不是說有兩個鎖
// 加讀鎖
func (rw *RWMutex) RLock()// 嘗試加讀鎖
func (rw *RWMutex) TryRLock() bool// 解讀鎖
func (rw *RWMutex) RUnlock()// 加寫鎖
func (rw *RWMutex) Lock()// 嘗試加寫鎖
func (rw *RWMutex) TryLock() bool// 解寫鎖
func (rw *RWMutex) Unlock()
下面看個讀寫鎖的例子(本例來自官方中文文檔):
var wait sync.WaitGroup
var count = 0
var rw sync.RWMutexfunc main() {wait.Add(12)// 讀多寫少go func() {for i := 0; i < 3; i++ {go Write(&count)}wait.Done()}()go func() {for i := 0; i < 7; i++ {go Read(&count)}wait.Done()}()// 等待子協程結束wait.Wait()fmt.Println("最終結果", count)
}func Read(i *int) {time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))rw.RLock()fmt.Println("拿到讀鎖")time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))fmt.Println("釋放讀鎖", *i)rw.RUnlock()wait.Done()
}func Write(i *int) {time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))rw.Lock()fmt.Println("拿到寫鎖")temp := *itime.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))*i = temp + 1fmt.Println("釋放寫鎖", *i)rw.Unlock()wait.Done()
}
該例開啟了 3 個寫協程,7 個讀協程,在讀數據的時候都會先獲得讀鎖,讀協程可以正常獲得讀鎖,但是會阻塞寫協程,獲得寫鎖的時候,則會同時阻塞讀協程和寫協程,直到釋放寫鎖,如此一來實現了讀協程與寫協程互斥,保證了數據的正確性。例子輸出如下:
拿到讀鎖
拿到讀鎖
釋放讀鎖 0
釋放讀鎖 0
拿到寫鎖
釋放寫鎖 1
拿到讀鎖
拿到讀鎖
拿到讀鎖
拿到讀鎖
拿到讀鎖
釋放讀鎖 1
釋放讀鎖 1
釋放讀鎖 1
釋放讀鎖 1
釋放讀鎖 1
拿到寫鎖
釋放寫鎖 2
拿到寫鎖
釋放寫鎖 3
最終結果 3Process finished with the exit code 0
OK 上面就是go中并發的一些常用案例,不多,但是一定是最常用的,掌握了這些你就可以去深入擴展了。