協程和管道
- 1、協程
- 1.1、進程、線程和協程
- 1.2、goroutine的使用以及sync.WaitGroup
- 1.3、啟動多個協程
- 1.4、設置Golang并行運行的時候占用的cup數量
- 1.5、goroutine統計素數
- 2、管道
- 2.1、管道的操作
- 2.2、協程和管道協同
- 2.3、單向管道
- 2.4、多路復用之select
- 2.5、解決協程中出現的異常問題
- 3、Golang協程同步與互斥
- 3.1、互斥鎖
- 3.2、讀寫鎖
- 3.3、條件變量
1、協程
1.1、進程、線程和協程
進程(Process)就是程序在操作系統中的一次執行過程,是系統進行資源分配和調度的基本單位,進程是一個動態概念,是程序在執行過程中分配和管理資源的基本單位,每一個進程都有一個自己的地址空間。一個進程至少有 5 種基本狀態,它們是:初始態,執行態,等待狀態,就緒狀態,終止狀態。
線程是進程的一個執行實例,是程序執行的最小單元,它是比進程更小的能獨立運行的基本單位
并發:多個線程同時競爭一個位置,競爭到的才可以執行,每一個時間段只有一個線程在執行。
并行:多個線程可以同時執行,每一個時間段,可以有多個線程同時執行。
通俗的講多線程程序在單核CPU上面運行就是并發,多線程程序在多核CUP上運行就是并行,如果線程數大于CPU核數,則多線程程序在多個CPU上面運行既有并行又有并發。
Golang中的協程:
Golang中的主線程:(可以理解為線程/也可以理解為進程),在一個Golang程序的主線程上可以起多個協程。Golang 中多協程可以實現并行或者并發。
協程:可以理解為用戶級線程,這是對內核透明的,也就是系統并不知道有協程的存在,是完全由用戶自己的程序進行調度的。Golang的一大特色就是從語言層面原生支持協程,在函數或者方法前面加go關鍵字就可創建一個協程。可以說Golang中的協程就是goroutine 。
Golang 中的多協程有點類似其他語言中的多線程。
多協程和多線程:Golang 中每個goroutine (協程) 默認占用內存遠比Java 、C的線程少。
OS線程(操作系統線程)一般都有固定的棧內存(通常為 2MB 左右),一個goroutine (協程)占用內存非常小,只有 2KB 左右,多協程 goroutine切換調度開銷方面遠比線程要少。
1.2、goroutine的使用以及sync.WaitGroup
下面實現創建一個協程,在協程和主線程中分別執行打印語句,每次休眠一秒。
package mainimport ("fmt""time"
)func test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}
}func main() {go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(time.Second)}
}
但是有個問題,如果主線程執行的速度比較快呢,我們可以修改一下代碼,讓主線程跑快一些。
此時我們發現主線程執行完后,協程不會再繼續執行了。這是因為主線程執行完后整個程序就退出了。
所以我們需要使用sync.WaitGroup來讓主線程等待協程。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}wg.Done()
}func main() {wg.Add(1)go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(100)}wg.Wait()
}
可以看到此時主線程執行完后會等待協程執行完,然后才會退出。有點類似于創建進程/線程并進行進程/線程等待回收。
其中:sync.WaitGroup本質上是一個計數器,Add方法表示增加計數器,Done表示讓計數器減1,Wait表示等待協程執行完畢。
1.3、啟動多個協程
package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc test(id int) {for i := 1; i <= 3; i++ {fmt.Printf("我是協程[%v]..., i=%d\n", id, i)}wg.Done()
}func main() {for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()
}
1.4、設置Golang并行運行的時候占用的cup數量
可以使用runtime.NumCPU()來獲取當前計算機上CPU核心的數量。
Go運行時的調度器使用GOMAXPROCS參數來確定需要使用多少個 OS 線程來同時執行Go代碼。默認值是機器上的 CPU 核心數。 例如在一個 8 核心的機器上,調度器會把 Go 代碼同時調度到8個OS線程上。
Go語言中可以通過runtime.GOMAXPROCS()函數設置當前程序并發時占用的CPU邏輯核心數。
package mainimport ("fmt""runtime"
)func main() {num := runtime.NumCPU()fmt.Println("CPU數量為:", num)runtime.GOMAXPROCS(num - 1)
}
1.5、goroutine統計素數
現在假設要統計1->50000000中有多少素數,最普遍的做法是使用一個for循環來做,如下:
package mainimport ("fmt""time"
)func main() {u1 := time.Now().Unix()// var cnt = 0for i := 2; i <= 50000000; i++ {flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {// cnt++}}// fmt.Println("共有素數:", cnt)u2 := time.Now().Unix()fmt.Println("花費時間:", u2-u1)
}
我們發現運行時間高達12S。下面我們使用goroutine試試看:
我們創建5個協程來完成,每個協程處理一千萬個數據。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test(x int) {for i := (x-1)*10000000 + 1; i <= x*10000000; i++ {if i == 1 {continue}flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {}}wg.Done()
}func main() {u1 := time.Now().Unix()for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()u2 := time.Now().Unix()fmt.Println("花費時間:", u2-u1)
}
可以看到這里我們花費時間大大的降低了,那如果我們想實現幾個協程判斷素數,其中一個協程進行打印呢?就需要使用到下面的管道了。
2、管道
管道是Golang在語言級別上提供的goroutine間的通訊方式,我們可以使用channel在多個goroutine之間傳遞消息。如果說goroutine是Go程序并發的執行體,channel就是它們之間的連接。channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制。
Golang的并發模型是CSP(Communicating Sequential Processes),提倡通過通信共享內存而不是通過共享內存而實現通信。
Go語言中的管道(channel)是一種特殊的類型。管道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每一個管道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。
2.1、管道的操作
1、channel類型。
2、創建管道需要使用make函數。
3、channel操作。
管道有發送(send)、接收(receive)和關閉(close)三種操作。其中發送和接收都需要使用<-符號,例子如下:
package mainimport "fmt"func main() {// 1.創建管道ch := make(chan int, 3)// 2.給管道發送數據ch <- 10ch <- 20ch <- 30// 3.從管道獲取數據a := <-chfmt.Println(a)fmt.Printf("值: %v, 類型: %T, 長度: %d, 容量: %d\n", ch, ch, len(ch), cap(ch))
}
管道有點類似隊列的結構特點,所以獲取的a值為10。另外打印管道返回的是一個地址,容量為3,由于我們取出了一個數據,所以長度為2。
4、管道是引用數據類型。
package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20ch2 := chch2 <- 30<-ch<-chfmt.Println(<-ch2)
}
5、管道阻塞
當管道中沒有數據,再去取就會阻塞。當管道中數據寫滿了,再去取也會阻塞。
package mainfunc main() {ch := make(chan int, 3)ch <- 10ch <- 20ch <- 30ch <- 40
}
package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20<-ch<-chnum := <-chfmt.Println(num)
}
6、for range遍歷管道
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for v := range ch {fmt.Println(v)}
}
注意:for range遍歷管道只有一個返回值,并且會報錯。
解決辦法:調用close函數關閉管道,這樣for range遍歷結束就會退出,不會報錯。
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}close(ch)for v := range ch {fmt.Println(v)}
}
另外還可以直接使用for循環遍歷,不過需要知道管道中的元素個數。
package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for i := 0; i < len(ch); i++ {fmt.Println(<-ch)}
}
2.2、協程和管道協同
7、需求:使用goroutine和channel協同工作。
- 開啟一個協程向管道中寫入數據。
- 開啟一個協程叢管道中讀取數據。
- 主線程必須等協程操作完才能退出。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("協程[1]向管道中寫入數據:", i)time.Sleep(500)}close(ch)wg.Done()
}func fn2(ch chan int) {for i := 1; i <= 5; i++ {x := <-chfmt.Println("協程[2]從管道中讀取數據:", x)time.Sleep(500)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(2)go fn1(ch)go fn2(ch)wg.Wait()
}
這里先讀取是因為協程1已經把數據寫入了,只不過還沒打印出來就被協程2取走并打印輸出了。管道是自帶同步和互斥機制的,所以哪怕讓協程2休眠時間遠短于協程1,協程2也會阻塞住等待。
再來看另外一個例子,使用go關鍵字配合匿名自執行函數創建協程。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc main() {var ch = make(chan int, 3)wg.Add(1)go func() {for i := 1; i <= 3; i++ {num := <-chfmt.Println(num)}wg.Done()}()wg.Add(1)go func() {for i := 1; i <= 3; i++ {time.Sleep(time.Second)ch <- i}wg.Done()}()wg.Wait()
}
這里有點類似于C++中通過lambda表達式創建線程執行。
需求:改善上面實現的素數判斷,還是創建多個協程來判斷素數,但是我們還要創建一個協程來打印素數,這就需要實現協程間通信,所以就需要使用協程+管道。
- 創建一個管道intChain和一個協程,這個協程負責寫入需要判斷的值,然后判斷素數的協程從管道intChain中獲取數據進行判斷。
- 創建16個協程和一個管道primeChain,這16個協程從上面的管道intChain中獲取數據進行判斷,如果是素數就寫入到新創建的管道primeChain中。
- 創建一個打印素數的協程,該協程從存放素數的管道primeChain中獲取數據打印輸出。
- 主線程進行等待
但是還要注意,我們打印素數協程是使用for range遍歷管道的,所以需要close管道,而我們不能在執行方法中隨意close管道,因為可能其他協程還要寫入,所以還需要一個exitChain來標識,當判斷素數的協程執行完就向exitChain寫入true。然后我們另外創建一個協程來讀取exitChain,當十六次全部讀取完畢就可以關閉primeChain,這樣for range就不會出錯了。
實現代碼如下:
package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc putNum(intChain chan int) {for i := 2; i <= 100; i++ {intChain <- i}close(intChain)wg.Done()
}func isPrime(intChain chan int, primeChain chan int, exitChain chan bool) {for v := range intChain {flag := truefor j := 2; j*j <= v; j++ {if v%j == 0 {flag = falsebreak}}if flag {primeChain <- v}}exitChain <- truewg.Done()
}func printPrime(primeChain chan int) {for v := range primeChain {fmt.Printf("%v是素數\n", v)}wg.Done()
}func main() {intChan := make(chan int, 1000)primeChan := make(chan int, 1000)exitChan := make(chan bool, 16)wg.Add(1)go putNum(intChan)for i := 0; i < 16; i++ {wg.Add(1)go isPrime(intChan, primeChan, exitChan)}wg.Add(1)go printPrime(primeChan)wg.Add(1)go func() {for i := 0; i < 16; i++ {<-exitChan}close(primeChan)wg.Done()}()wg.Wait()fmt.Println("執行完畢...")
}
2.3、單向管道
有的時候我們會將管道作為參數在多個任務函數間傳遞, 很多時候我們在不同的任務函數中使用管道都會對其進行限制,比如限制管道在函數中只能發送或只能接收。
單向管道的實現如下,在chan左邊或右邊添加<-。
// 聲明為只寫
var chan1 chan<- int
chan1 = make(chan int, 3)// 聲明為只讀
var chan2 <-chan int
chan2 = make(chan int, 3)
舉個例子,創建兩個協程實現一個協程向管道中寫入數據,另一個協程從管道中讀取數據。按之前的寫法如下:
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("協程[1]向管道中寫入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch chan int) {for v := range ch {fmt.Println("協程[2]從管道中讀取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}
這么寫其實也沒有什么問題,但是我們可以在函數參數上進一步限制管道,對于fn1來說,該管道只進行寫入,對于fn2來說,該管道只進行讀取,所以可以修改成下面的代碼:
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("協程[1]向管道中寫入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch <-chan int) {for v := range ch {fmt.Println("協程[2]從管道中讀取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}
2.4、多路復用之select
在某些場景下我們需要同時從多個管道中讀取數據,這時候就可以使用多路復用技術。多路復用本質上是一種就緒事件的通知機制。
select的使用類似于switch語句,它有一系列case分支和一個默認的分支。每個case會對應一個管道的通信(接收或發送) 過程。select會一直等待,直到底層事件就緒時, 就會執行case分支對應的語句。 具體格式如下:
當讀取完所有數據后就會走default。
例如下面讀取兩個管道中的數據,可以創建兩個協程來讀取,也可以使用多路復用。
package mainimport "fmt"func main() {intChan := make(chan int, 10)for i := 1; i <= 10; i++ {intChan <- i}strChan := make(chan string, 5)for i := 1; i <= 5; i++ {strChan <- fmt.Sprintf("hello-%d", i)}for {select {case v := <-intChan:fmt.Println("從intChan中獲取數據:", v)case v := <-strChan:fmt.Println("從strChan中獲取數據:", v)default:fmt.Println("數據獲取完畢...")return}}
}
注意:
1、走到default表示管道中的數據都獲取完畢了,由于外層是for死循環,所以需要return退出。
2、使用select來獲取管道中的數據,不需要close管道。
2.5、解決協程中出現的異常問題
package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {var m map[string]stringm["username"] = "張三"
}func main() {go print()go test()time.Sleep(time.Second)
}
在上面的test中,由于我們只是聲明了m,沒有使用make函數來創建空間,所以該協程出現異常導致整個程序崩潰,類似于C/C++中線程出現異常導致整個進程崩潰。
所以我們可以使用defer + recover來解決。
package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {defer func() {err := recover()if err != nil {fmt.Println("err:", err)}}()var m map[string]stringm["username"] = "張三"
}func main() {go print()go test()time.Sleep(time.Second)
}
3、Golang協程同步與互斥
3.1、互斥鎖
多協程訪問共享資源不加以保護就會出問題,下面用多協程模擬一輪搶票。
package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroupfunc GetTicket(i int) {for {if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("協程[%d]搶到票: %d\n", i, ticket)ticket--} else {break}}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}
多協程共享全局變量,在進行搶票的時候我們發現多個協程竟然搶到同一張票,所以我們需要加鎖保護。Golang中的互斥量使用很簡單,只需要在全局定義一個sync.Mutex對象,調用其中的Lock和Unlock方法即可。
package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroup
var mutex sync.Mutexfunc GetTicket(i int) {for {mutex.Lock()if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("協程[%d]搶到票: %d\n", i, ticket)ticket--} else {mutex.Unlock()break}mutex.Unlock()}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}
3.2、讀寫鎖
讀寫鎖保證任何時刻只有讀者或者只有寫者,如果是寫者只能有一個寫者,如果是讀者可以有多個讀者。使用如下:
var rwMtx sync.RWMutex // 定義讀寫鎖
rwMtx.Lock() //寫者加鎖
rwMtx.Unlock() //寫者解鎖
rwMtx.RLock() // 讀者加鎖
rwMtx.RUnlock() // 讀者解鎖
下面創建一個協程協程寫入數據,另一批協程讀取數據。
package mainimport ("fmt""sync"
)var wg sync.WaitGroup
var rwMtx sync.RWMutexfunc read() {rwMtx.RLock()fmt.Println("協程讀取數據...")rwMtx.RUnlock()wg.Done()
}func write() {rwMtx.Lock()fmt.Println("協程寫入數據...")rwMtx.Unlock()wg.Done()
}func main() {for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 10; i++ {wg.Add(1)go read()}wg.Wait()
}
3.3、條件變量
條件變量是用來實現協程同步和互斥的。使用如下:
var mutex sync.Mutex
var cond = sync.NewCond(&mutex) // 傳入鎖初始化條件變量
cond.Wait() // 等待條件變量
cond.Signal() // 喚醒一個協程
cond.Broadcast() // 喚醒所有協程
cond.L.Lock() // 加鎖,本質使用的是傳入的mutex鎖
cond.L.Unlock() // 解鎖,本質使用的是傳入的mutex鎖
加鎖可以直接使用條件變量提供的方法加鎖,也可以使用我們定義的鎖來加鎖,但是要保證是同一把鎖。
下面使用條件變量實現協程同步和互斥,需求:兩個協程交替打印奇數和偶數。
package mainimport ("fmt""sync""time"
)var mutex sync.Mutex
var cond = sync.NewCond(&mutex)
var wg sync.WaitGroup
var x = 1
var flag = truefunc fn1() {for {cond.L.Lock()for !flag {cond.Wait()}fmt.Println("協程[1]打印數據:", x)x++flag = falsetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func fn2() {for {cond.L.Lock()for flag {cond.Wait()}fmt.Println("協程[2]打印數據:", x)x++flag = truetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func main() {wg.Add(1)go fn1()wg.Add(1)go fn2()wg.Wait()
}
由于管道自帶同步互斥保護機制,所以也可以使用協程+管道來實現。
package mainimport ("fmt""sync""time"
)var x = 1
var wg sync.WaitGroupfunc fn1(ch1 <-chan bool, ch2 chan<- bool) {for {<-ch1fmt.Println("協程[1]打印數據:", x)time.Sleep(time.Second)x++ch2 <- true}wg.Done()
}func fn2(ch1 chan<- bool, ch2 <-chan bool) {for {<-ch2fmt.Println("協程[2]打印數據:", x)time.Sleep(time.Second)x++ch1 <- true}wg.Done()
}func main() {var ch1 = make(chan bool, 1)var ch2 = make(chan bool, 1)ch1 <- truewg.Add(1)go fn1(ch1, ch2)wg.Add(2)go fn2(ch1, ch2)wg.Wait()
}