協程與通道
什么是協程
一個應用程序是運行在機器上的一個進程;進程是一個運行在自己內存地址空間里的獨立執行體。一個進程由一個或多個操作系統線程組成,這些線程其實是共享同一個內存地址空間的一起工作的執行體。
并行是一種通過使用多處理器以提高速度的能力。所以并發程序可以是并行的,也可以不是。
公認的,使用多線程的應用難以做到準確,最主要的問題是內存中的數據共享,它們會被多線程以無法預知的方式進行操作,導致一些無法重現或者隨機的結果(稱作 競態)。
不要使用全局變量或者共享內存,它們會給你的代碼在并發運算的時候帶來危險。
在 Go 中,應用程序并發處理的部分被稱作 goroutines(協程),它可以進行更有效的并發運算。在協程和操作系統線程之間并無一對一的關系:協程是根據一個或多個線程的可用性,映射(多路復用,執行于)在他們之上的;協程調度器在 Go 運行時很好的完成了這個工作。
協程是輕量的,比線程更輕。它們痕跡非常不明顯(使用少量的內存和資源):使用 4K 的棧內存就可以在堆中創建它們。因為創建非常廉價,必要的時候可以輕松創建并運行大量的協程(在同一個地址空間中 100,000 個連續的協程)。并且它們對棧進行了分割,從而動態的增加(或縮減)內存的使用;棧的管理是自動的,但不是由垃圾回收器管理的,而是在協程退出后自動釋放。
存在兩種并發方式:
-
確定性的(明確定義排序)
-
非確定性的(加鎖 / 互斥從而未定義排序)。
Go 的協程和通道理所當然的支持確定性的并發方式(例如通道具有一個 sender 和一個 receiver)。
并發和并行的差異
Go 的并發原語提供了良好的并發設計基礎:表達程序結構以便表示獨立地執行的動作;所以 Go 的重點不在于并行的首要位置:并發程序可能是并行的,也可能不是。并行是一種通過使用多處理器以提高速度的能力。但往往是,一個設計良好的并發程序在并行方面的表現也非常出色。
使用 GOMAXPROCS
在 gc 編譯器下(6g 或者 8g)你必須設置 GOMAXPROCS 為一個大于默認值 1 的數值來允許運行時支持使用多于 1 個的操作系統線程,否則所有的協程都會共享同一個線程。 當 GOMAXPROCS 大于 1 時,會有一個線程池管理眾多線程。gccgo 編譯器 會使 GOMAXPROCS 與運行中的協程數量相等。假設一個機器上有 n 個處理器或者核心。如果你設置環境變量 GOMAXPROCS>=n,或者執行 runtime.GOMAXPROCS(n),那么協程會被分割(或分散)到 n 個處理器上。但是增加處理器數量并不意味著性能的線性提升。通常,如果有 n 個核心,會設置 GOMAXPROCS 為 n-1 以獲得最佳性能,但同樣也需要保證,協程的數量 > 1 + GOMAXPROCS > 1。
所以如果在某一時間只有一個協程在執行,不要設置 GOMAXPROCS!
如何用命令行指定使用的核心數量
使用 flags 包,如下:
var numCores = flag.Int("n", 2, "number of CPU cores to use")
?
in main()
flag.Parse()
runtime.GOMAXPROCS(*numCores)
協程可以通過調用 runtime.Goexit() 來停止,盡管這樣做幾乎沒有必要。
Go 協程(goroutines)和協程(coroutines)
-
Go 協程意味著并發(或者可以以并行的方式部署),協程一般來說不是這樣的
-
Go 協程通過通道來通信;協程通過讓出和恢復操作來通信
Go 程(goroutine)是由 Go 運行時管理的輕量級線程。
go f(x, y, z)
會啟動一個新的 Go 程并執行
f(x, y, z)
f
, x
, y
和 z
的求值發生在當前的 Go 程中,而 f
的執行發生在新的 Go 程中
package main
?
import ("fmt""time"
)
?
func say(s string) {for i := 0; i < 5; i++ {time.Sleep(100 * time.Millisecond)fmt.Println(s)}
}
?
func main() {go say("world")say("hello")
}
協程間的信道
概念
而 Go 有一個特殊的類型,通道(channel),像是通道(管道),可以通過它們發送類型化的數據在協程之間通信,可以避開所有內存共享導致的坑;通道的通信方式保證了同步性。數據通過通道:同一時間只有一個協程可以訪問數據:所以不會出現數據競爭,設計如此。數據的歸屬(可以讀寫數據的能力)被傳遞。
通常使用這樣的格式來聲明通道:var identifier chan datatype
未初始化的通道的值是 nil。
所以通道只能傳輸一種類型的數據,比如 chan int 或者 chan string,所有的類型都可以用于通道,空接口 interface{} 也可以。
var ch1 chan string
ch1 = make(chan string)
當然可以更短: ch1 := make(chan string)
。
通信操作符 <-
這個操作符直觀的標示了數據的傳輸:信息按照箭頭的方向流動。
-
流向通道(發送)
-
ch <- int1
表示:用通道 ch 發送變量 int1(雙目運算符,中綴 = 發送)
-
-
從通道流出(接收),三種方式:
-
int2 = <- ch 表示:變量 int2 從通道 ch(一元運算的前綴操作符,前綴 = 接收)接收數據(獲取新值)
-
假設 int2 已經聲明過了,如果沒有的話可以寫成:int2 := <- ch。
-
<- ch
可以單獨調用獲取通道的(下一個)值,當前值會被丟棄,但是可以用來驗證,所以以下代碼是合法的:if <- ch != 1000{... }
-
package main
?
import ("fmt""time"
)
?
func main() {ch := make(chan string)
?go sendData(ch)go getData(ch)
?time.Sleep(1e9)
}
?
func sendData(ch chan string) {ch <- "Washington"ch <- "Tripoli"ch <- "London"ch <- "Beijing"ch <- "Tokio"
}
?
func getData(ch chan string) {var input string// time.Sleep(2e9)for {input = <-chfmt.Printf("%s ", input)}
}
通道阻塞
默認情況下,通信是同步且無緩沖的:在有接收者接收數據之前,發送不會結束。可以想象一個無緩沖的通道在沒有空間來保存數據的時候:必須要一個接收者準備好接收通道的數據然后發送者可以直接把數據發送給接收者。所以通道的發送 / 接收操作在對方準備好之前是阻塞的:
-
對于同一個通道,發送操作(協程或者函數中的),在接收者準備好之前是阻塞的:如果 ch 中的數據無人接收,就無法再給通道傳入其他數據:新的輸入無法在通道非空的情況下傳入。所以發送操作會等待 ch 再次變為可用狀態:就是通道值被接收時(可以傳入變量)。
-
對于同一個通道,接收操作是阻塞的(協程或函數中的),直到發送者可用:如果通道中沒有數據,接收者就阻塞了。
package main
?
import ("fmt""time"
)
?
func main() {ch1 := make(chan int)go pump(ch1)go suck(ch1)time.Sleep(1e9)
}
?
func suck(ch chan int) {for {fmt.Println(<-ch)}
}
?
func pump(ch chan int) {for i := 0; ; i++ {ch <- i}
}
上面這段程序創建兩個協程,一個用于發送一個用于接收,從開始運行直到 time.Sleep(1e9)
代碼運行完畢,程序結束。
通過一個(或多個)通道交換數據進行協程同步
通信是一種同步形式:通過通道,兩個協程在通信(協程會和)中某刻同步交換數據。無緩沖通道成為了多個協程同步的完美工具。
發送者可通過 close
關閉一個信道來表示沒有需要發送的值了。接收者可以通過為接收表達式分配第二個參數來測試信道是否被關閉:若沒有值可以接收且信道已被關閉,那么在執行完
v, ok := <-ch
之后 ok
會被設置為 false
。
循環 for i := range c
會不斷從信道接收值,直到它被關閉。
注意:
-
只有發送者才能關閉信道,而接收者不能。向一個已經關閉的信道發送數據會引發程序恐慌(panic)。
-
信道與文件不同,通常情況下無需關閉它們。只有在必須告訴接收者不再有需要發送的值時才有必要關閉,例如終止一個
range
循環。
package main
?
import ("fmt"
)
?
func main() {a := 0c := make(chan int, 10)go fibonacci(cap(c), c)for i := range c {a++fmt.Println(i)}println(a)
}
?
func fibonacci(n int, c chan int) {x, y := 0, 1for i := 0; i < n; i++ {c <- xx, y = y, x+y}close(c)
}
死鎖:
package main
?
import ("fmt"
)
?
func f1(in chan int) {fmt.Println(<-in)
}
?
func main() {out := make(chan int)out <- 2go f1(out)
}
同步通道 - 使用帶緩沖的通道
一個無緩沖通道只能包含 1 個元素,有時顯得很局限。我們給通道提供了一個緩存,可以在擴展的 make 命令中設置它的容量,如下:
buf := 100 ch1 := make(chan string, buf) buf 是通道可以同時容納的元素(這里是 string)個數
在緩沖滿載(緩沖被全部使用)之前,給一個帶緩沖的通道發送數據是不會阻塞的,而從通道讀取數據也不會阻塞,直到緩沖空了。
同步:ch :=make(chan type, value)
-
value == 0 -> synchronous, unbuffered (阻塞)
-
value > 0 -> asynchronous, buffered(非阻塞)取決于 value 元素
協程中用通道輸出結果
信號量模式
使用通道讓 main
程序等待協程完成
協程通過在通道 ch
中放置一個值來處理結束的信號。main
協程等待 <-ch
直到從中獲取到值。
select 語句
select
語句使一個 Go 程可以等待多個通信操作。
select
會阻塞到某個分支可以繼續執行為止,這時就會執行該分支。當多個分支都準備好時會隨機選擇一個執行。
從不同的并發執行的協程中獲取值可以通過關鍵字 select 來完成,它和 switch 控制語句非常相似也被稱作通信開關;它的行為像是 “你準備好了嗎” 的輪詢機制;select 監聽進入通道的數據,也可以是用通道發送值的時候。(蠻像 juc 里面 nio 的 selector 選擇器)
格式:
select {
case u:= <- ch1:...
case v:= <- ch2:......
default: // no value ready to be received...
}
例子:
package main
?
import ("fmt"
)
?
func main() {c := make(chan int, 10)quit := make(chan int)go func() {for i := 0; i < 10; i++ {fmt.Println(<-c)}quit <- 0}()fibonacci(c, quit)
}
?
func fibonacci(c, quit chan int) {x, y := 0, 1for {select {case c <- x:x, y = y, x+ycase <-quit:fmt.Println("quit")return}}
}
select 做的就是:選擇處理列出的多個通信情況中的一個。
-
如果都阻塞了,會等待直到其中一個可以處理
-
如果多個可以處理,隨機選擇一個
-
如果沒有通道操作可以處理并且寫了 default 語句,它就會執行:default 永遠是可運行的(這就是準備好了,可以執行)。
select
語句實現了一種監聽模式,通常用在(無限)循環中;在某種情況下,通過 break
語句使循環退出。
默認選擇
當 select
中的其它分支都沒有準備好時,default
分支就會執行。
為了在嘗試發送或者接收時不發生阻塞,可使用 default
分支:
select {
case i := <-c:// 使用 i
default:// 從 c 中接收會阻塞時執行
}
舉例:
package main
?
import ("fmt""time"
)
?
func main() {tick := time.Tick(100 * time.Millisecond)boom := time.After(500 * time.Millisecond)for {select {case <-tick:fmt.Println("tick.")case <-boom:fmt.Println("BOOM!")returndefault:fmt.Println(" ? .")time.Sleep(50 * time.Millisecond)}}
}
sync.Mutex
Go 標準庫中提供了 sync.Mutex 互斥鎖類型及其兩個方法:
-
Lock
-
Unlock
我們可以通過在代碼前調用 Lock
方法,在代碼后調用 Unlock
方法來保證一段代碼的互斥執行。
package main
?
import ("fmt""sync""time"
)
?
// SafeCounter 的并發使用是安全的。
type SafeCounter struct {v ? map[string]intmux sync.Mutex
}
?
// Inc 增加給定 key 的計數器的值。
func (c *SafeCounter) Inc(key string) {c.mux.Lock()// Lock 之后同一時刻只有一個 goroutine 能訪問 c.vc.v[key]++c.mux.Unlock()
}
?
// Value 返回給定 key 的計數器的當前值。
func (c *SafeCounter) Value(key string) int {c.mux.Lock()// Lock 之后同一時刻只有一個 goroutine 能訪問 c.vdefer c.mux.Unlock()return c.v[key]
}
?
func main() {c := SafeCounter{v: make(map[string]int)}for i := 0; i < 1000; i++ {go c.Inc("somekey")}
?time.Sleep(time.Second)fmt.Println(c.Value("somekey"))
}
協程和恢復(recover)
func server(workChan <-chan *Work) {for work := range workChan {go safelyDo(work) ? // start the goroutine for that work}
}
?
func safelyDo(work *Work) {defer func() {if err := recover(); err != nil {log.Printf("Work failed with %s in %v", err, work)}}()do(work)
}