golang并發編程 - 例子解析
February 26, 2013
最近在看《Programming in Go》, 其中關于并發編程寫得很不錯, 受益非淺, 其中有一些例子是需要多思考才能想明白的, 所以我打算記錄下來, 強化一下思路
《Programming in Go》在?Chapter 7. Concurrent Programming?里面一共用3個例子來講述并發編程的3個模式, 第一個是?filter?, 篩選出后綴名和文件大小文件列表, 還算簡單就不說, 然后第二個是升級版, 正則版?filter?, 不同的是他是根據正則搜索出文件的文本并且列出來. 這個例子我起初看是有點蒙的, 這樣寫是沒錯, 但是為什么要這樣寫, 他的設計思路是什么, 和其他方法相比他有什么優勢, 這些都不清楚, 于是決定好好分析一下. 實際上這個例子實現的功能并不復雜, 所以我的文章實際上是在討論怎么產生出和作者相似的思路.
如果不考慮用 goroutine 的話, 思路其實很簡單:
1. 列出文件列表, 編譯正則.
2. 遍歷文件, 打開并遍歷每行, 如果正則能匹配, 記錄下來.
3. 列出來.
如果用 goroutine , 就會有以下思路:
1. 在得到文件路徑數組之后, 分發任務給N個核.
2. 每個核負責打開文件, 將符合條件的那行文本寫入到 `channel`
3. 主線程等待并接收`channel`的結果. 顯示出來, 完畢
** 然后下文才是重點 **
1. channel關閉的時機
在go中, channel 是不會自動關閉的, 所以需要在我們使用完之后手動去關閉, 而且如果使用for語法來遍歷channel每次得到的數據, 如果channel沒有關閉的話會陷入死循環. 在 goroutine 中會造成 deadlock
for job := range jobs {
fmt.Println(job)
}
如果沒close, 會觸發dead lock. 因為for...range...會自動阻塞直到讀取到數據或者channel關閉, 沒close的話就會導致整個channel處于睡眠狀態. channel關閉后, 就不允許寫入(緩沖的數據還在, 還可以讀取), 所以, channel 關閉的時機很重要.
2. 分發任務
我所知道任務分發方法有兩種:
第一種是固定分配, 如果說我想計算1+2+3+...+100, 然后分成4份, 也就是?1+2+..+25,?...,?...,?86+87+...+100, 然后再將結果累加起來.
還有一種是搶占式的, 這里需要使用一個隊列, 將所有任務寫入隊列, 然后開N個goroutine, 每個goroutine從隊列讀取任務(要確保線程安全), 處理, 完成后再繼續讀取任務. 不再是固定分配, 自己那份做完了就休息了, 所以看來第二種要好一點.
采用第二種方式的話, 對應go的做法, 那就是使用一個channel, 命名為?jobs, 將所有的任務寫入進去, 寫入完畢之后關閉這個 channel, 當然, 因為是N核, 系統能同時處理的任務我們設置為N個(也就是我們使用了N個goroutine), 那么聲明?jobs?是緩沖區長度為N的 channel.
Buffered channel?和普通的 channel 的差別是他可以同時容納多個單位數據, 當緩存的數據單位數量等于 channel 容量的時候, 再執行寫入將會阻塞, 否則都是及時處理的.
3. 結果集
當我們將數據處理后, 就需要將結果收集起來. 需要注意的是, 這些操作不是在主 goroutine 執行, 所以我們需要通過 channel 傳遞給主 goroutine . 所以只需要在外部聲明一個名為?results?的 channel . 然后在主 goroutine 通過?for?來顯示, 這時候就會發現一個問題, 這個?results?關閉的時機問題. 正確的關閉時機是寫入所有的?Result?之后. 但是別忘了我們同時開了多個 goroutine , 所以?results?應該在?執行任務的 goroutine 完成信號累計到N個?這個時機關閉. 所以我們再引入一個名叫?done?的 channel 來解決. 每個 goroutine 發送完 result 后會寫入一次done, 然后我們就可以遍歷 done , 遍歷之后說明全部完成了, 再執行顯示.
Result 的數據結構
type Result struct {
filenamestringlino int
linestring}
書中的?cgrep1?就是這樣的
func awaitCompletion(done
close(results)
}
但是這樣有可能造成死鎖, 因為書中?results?緩沖區長度限定為最大1000個, 也就是超過1000個 result 的時候再打算寫入 result 會等待取出 result 后才執行, done 也不會寫入, 而?awaitCompletion?是等到所有 goroutine 都完成了才會取出?results, 而且當?result?非常大的時候因為內存的緣故也是不可能一次性取出的. 所以就需要在讀取?results?的同時讀取?done, 當讀取?done?次數大于 N 后關閉?results, 所以, 因為要在多個 channel 中同時讀取, 所以需要使用?select.
下面是書中的?cgrep3?, 改進版:
func waitAndProcessResults(timeout int64, done
finish:= time.After(time.Duration(timeout))for working := workers; working > 0; {
select {//Blocking
case result :=
case
}for{
select {//Nonblocking
case result :=
default:
return}
}
}
看到這里, 我就有個疑問, 為什么在全部完成之后(done都接收到N個了), 還要再遍歷出?results, 直到讀取不到才算讀取完成呢(我反應一向比較慢^_^)? 于是我做了個實驗, 去掉了后面再次循環的部分, 發現有時會遺漏掉數據(我用4個測試文件...), 證明這段代碼是有用的!!!
我的想法是, 他是在處理完 result, 然后寫入?results, 寫完了才發送?done, 也就是在收到所有的 done 之后, 所有的數據應該是已經處理完成的. 為了驗證這個想法, 我寫了一下代碼:
for working := workers; working > 0; {
select {//Blocking
case result :=
//received result
case
if working <= 0{
println(len(results))
}
}
}
然后看到輸出的數是大于0的, 也就是說在接收到全部 done 之后,?results?還有數據在緩沖區中, 然后在看看發送result?的代碼, 突然就明白了
func doJobs(done chan
job.Do(lineRx)
}done
}
我把寫入和讀取想當然認為一起發生了, 因為有緩沖區的緣故, doJobs在發送進?results?的緩沖區之后就立刻發送?done?了, 但是寫入的數據有沒有被處理, 是不知道的, 所以在接收到所有?done?之后,?results?緩沖區還有數據, 需要再循環一遍.
附我的代碼一份:
package main
import ("bufio"
"fmt"
"log"
"os"
"regexp"
"runtime")
type Job struct {
filenamestringresults chan
}
type Result struct {
filenamestringlinestringlino int
}var worker = runtime.NumCPU()
func main() {//config cpu number
runtime.GOMAXPROCS(worker)
files:= os.Args[2:]
regex, err := regexp.Compile(os.Args[1])if err !=nil {log.Fatal(err)return}//任務列表, 并發數目為CPU個數
jobs := make(chan Job,worker)//結果
results := make(chan Result, minimum(1000,len(files)))
defer close(results)//標記完成
dones := make(chan int,worker)
defer close(dones)
go addJob(files, jobs,results)for i := 0; i < worker; i++{
go doJob(jobs, regex,dones)
}
awaitForCloseResult(dones,results)
}
func addJob(files []string, jobs chan
jobs
}
close(jobs)
}
func doJob(jobs
job.Do(regex)
}
dones
func awaitForCloseResult(dones
working:= 0MyForLable:
for{
select {case result :=
if working >=worker {if rlen := len(results); rlen > 0{
println("----------------------------------")
println("left:",rlen)
println("----------------------------------")for i := 1; i <= rlen; i++{
println(
}
}breakMyForLable
}
}
}
}
func (j*Job) Do(re *regexp.Regexp) {
f, err := os.Open(j.filename)if err !=nil {
println(err)return}
defer f.Close()
b:= bufio.NewReader(f)
lino:= 0
for{
line, _, err := b.ReadLine()if re.Match(line) {
j.results
}if err !=nil {break}
lino+= 1}
}
func minimum(a,b int) int {if a >b {returnb
}returna
}
func println(o...interface{}) {
fmt.Println(o...)
}
轉自:http://chenye.org/goroutine-note.html