goroutine并發掃描MySQL表_goroutine 并發之搜索文件內容

golang并發編程 - 例子解析

February 26, 2013

最近在看《Programming in Go》, 其中關于并發編程寫得很不錯, 受益非淺, 其中有一些例子是需要多思考才能想明白的, 所以我打算記錄下來, 強化一下思路

《Programming in Go》在?Chapter 7. Concurrent Programming?里面一共用3個例子來講述并發編程的3個模式, 第一個是?filter?, 篩選出后綴名和文件大小文件列表, 還算簡單就不說, 然后第二個是升級版, 正則版?filter?, 不同的是他是根據正則搜索出文件的文本并且列出來. 這個例子我起初看是有點蒙的, 這樣寫是沒錯, 但是為什么要這樣寫, 他的設計思路是什么, 和其他方法相比他有什么優勢, 這些都不清楚, 于是決定好好分析一下. 實際上這個例子實現的功能并不復雜, 所以我的文章實際上是在討論怎么產生出和作者相似的思路.

如果不考慮用 goroutine 的話, 思路其實很簡單:

1. 列出文件列表, 編譯正則.

2. 遍歷文件, 打開并遍歷每行, 如果正則能匹配, 記錄下來.

3. 列出來.

如果用 goroutine , 就會有以下思路:

1. 在得到文件路徑數組之后, 分發任務給N個核.

2. 每個核負責打開文件, 將符合條件的那行文本寫入到 `channel`

3. 主線程等待并接收`channel`的結果. 顯示出來, 完畢

** 然后下文才是重點 **

1. channel關閉的時機

在go中, channel 是不會自動關閉的, 所以需要在我們使用完之后手動去關閉, 而且如果使用for語法來遍歷channel每次得到的數據, 如果channel沒有關閉的話會陷入死循環. 在 goroutine 中會造成 deadlock

for job := range jobs {

fmt.Println(job)

}

如果沒close, 會觸發dead lock. 因為for...range...會自動阻塞直到讀取到數據或者channel關閉, 沒close的話就會導致整個channel處于睡眠狀態. channel關閉后, 就不允許寫入(緩沖的數據還在, 還可以讀取), 所以, channel 關閉的時機很重要.

2. 分發任務

我所知道任務分發方法有兩種:

第一種是固定分配, 如果說我想計算1+2+3+...+100, 然后分成4份, 也就是?1+2+..+25,?...,?...,?86+87+...+100, 然后再將結果累加起來.

還有一種是搶占式的, 這里需要使用一個隊列, 將所有任務寫入隊列, 然后開N個goroutine, 每個goroutine從隊列讀取任務(要確保線程安全), 處理, 完成后再繼續讀取任務. 不再是固定分配, 自己那份做完了就休息了, 所以看來第二種要好一點.

采用第二種方式的話, 對應go的做法, 那就是使用一個channel, 命名為?jobs, 將所有的任務寫入進去, 寫入完畢之后關閉這個 channel, 當然, 因為是N核, 系統能同時處理的任務我們設置為N個(也就是我們使用了N個goroutine), 那么聲明?jobs?是緩沖區長度為N的 channel.

Buffered channel?和普通的 channel 的差別是他可以同時容納多個單位數據, 當緩存的數據單位數量等于 channel 容量的時候, 再執行寫入將會阻塞, 否則都是及時處理的.

3. 結果集

當我們將數據處理后, 就需要將結果收集起來. 需要注意的是, 這些操作不是在主 goroutine 執行, 所以我們需要通過 channel 傳遞給主 goroutine . 所以只需要在外部聲明一個名為?results?的 channel . 然后在主 goroutine 通過?for?來顯示, 這時候就會發現一個問題, 這個?results?關閉的時機問題. 正確的關閉時機是寫入所有的?Result?之后. 但是別忘了我們同時開了多個 goroutine , 所以?results?應該在?執行任務的 goroutine 完成信號累計到N個?這個時機關閉. 所以我們再引入一個名叫?done?的 channel 來解決. 每個 goroutine 發送完 result 后會寫入一次done, 然后我們就可以遍歷 done , 遍歷之后說明全部完成了, 再執行顯示.

Result 的數據結構

type Result struct {

filenamestringlino int

linestring}

書中的?cgrep1?就是這樣的

func awaitCompletion(done

close(results)

}

但是這樣有可能造成死鎖, 因為書中?results?緩沖區長度限定為最大1000個, 也就是超過1000個 result 的時候再打算寫入 result 會等待取出 result 后才執行, done 也不會寫入, 而?awaitCompletion?是等到所有 goroutine 都完成了才會取出?results, 而且當?result?非常大的時候因為內存的緣故也是不可能一次性取出的. 所以就需要在讀取?results?的同時讀取?done, 當讀取?done?次數大于 N 后關閉?results, 所以, 因為要在多個 channel 中同時讀取, 所以需要使用?select.

下面是書中的?cgrep3?, 改進版:

func waitAndProcessResults(timeout int64, done

finish:= time.After(time.Duration(timeout))for working := workers; working > 0; {

select {//Blocking

case result :=

case

}for{

select {//Nonblocking

case result :=

default:

return}

}

}

看到這里, 我就有個疑問, 為什么在全部完成之后(done都接收到N個了), 還要再遍歷出?results, 直到讀取不到才算讀取完成呢(我反應一向比較慢^_^)? 于是我做了個實驗, 去掉了后面再次循環的部分, 發現有時會遺漏掉數據(我用4個測試文件...), 證明這段代碼是有用的!!!

我的想法是, 他是在處理完 result, 然后寫入?results, 寫完了才發送?done, 也就是在收到所有的 done 之后, 所有的數據應該是已經處理完成的. 為了驗證這個想法, 我寫了一下代碼:

for working := workers; working > 0; {

select {//Blocking

case result :=

//received result

case

if working <= 0{

println(len(results))

}

}

}

然后看到輸出的數是大于0的, 也就是說在接收到全部 done 之后,?results?還有數據在緩沖區中, 然后在看看發送result?的代碼, 突然就明白了

func doJobs(done chan

job.Do(lineRx)

}done

}

我把寫入和讀取想當然認為一起發生了, 因為有緩沖區的緣故, doJobs在發送進?results?的緩沖區之后就立刻發送?done?了, 但是寫入的數據有沒有被處理, 是不知道的, 所以在接收到所有?done?之后,?results?緩沖區還有數據, 需要再循環一遍.

附我的代碼一份:

package main

import ("bufio"

"fmt"

"log"

"os"

"regexp"

"runtime")

type Job struct {

filenamestringresults chan

}

type Result struct {

filenamestringlinestringlino int

}var worker = runtime.NumCPU()

func main() {//config cpu number

runtime.GOMAXPROCS(worker)

files:= os.Args[2:]

regex, err := regexp.Compile(os.Args[1])if err !=nil {log.Fatal(err)return}//任務列表, 并發數目為CPU個數

jobs := make(chan Job,worker)//結果

results := make(chan Result, minimum(1000,len(files)))

defer close(results)//標記完成

dones := make(chan int,worker)

defer close(dones)

go addJob(files, jobs,results)for i := 0; i < worker; i++{

go doJob(jobs, regex,dones)

}

awaitForCloseResult(dones,results)

}

func addJob(files []string, jobs chan

jobs

}

close(jobs)

}

func doJob(jobs

job.Do(regex)

}

dones

func awaitForCloseResult(dones

working:= 0MyForLable:

for{

select {case result :=

if working >=worker {if rlen := len(results); rlen > 0{

println("----------------------------------")

println("left:",rlen)

println("----------------------------------")for i := 1; i <= rlen; i++{

println(

}

}breakMyForLable

}

}

}

}

func (j*Job) Do(re *regexp.Regexp) {

f, err := os.Open(j.filename)if err !=nil {

println(err)return}

defer f.Close()

b:= bufio.NewReader(f)

lino:= 0

for{

line, _, err := b.ReadLine()if re.Match(line) {

j.results

}if err !=nil {break}

lino+= 1}

}

func minimum(a,b int) int {if a >b {returnb

}returna

}

func println(o...interface{}) {

fmt.Println(o...)

}

轉自:http://chenye.org/goroutine-note.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/378800.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/378800.shtml
英文地址,請注明出處:http://en.pswp.cn/news/378800.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟件開發模型和軟件過程模型_什么是軟件和軟件過程?

軟件開發模型和軟件過程模型軟件 (Software) Software is a set of instructions which instructs the computer for performing different operations. Software is nothing else but a general name for computer programs. 軟件是一組指令&#xff0c;指導計算機執行不同的操…

甲骨文CEO埃里森稱將在Sun裁員1000人

據國外網站報道&#xff0c;甲骨文CEO拉利埃里森周三表示&#xff0c;在完成對Sun的收購后&#xff0c;將對該公司裁員1000人。不過他同時也表示&#xff0c;未來幾個月還要新雇2000人加強Sun的業務。 分析師們曾預測甲骨文完成收購后&#xff0c;將在Sun大裁員。不過埃里森和甲…

改變Jupyter的默認項目路徑

開始接觸Jupyter&#xff0c;看見它默認的工作路徑是C盤&#xff0c;很難受想換下工作空間路徑 管理員身份打開你的Anaconda Prompt 輸入jupyter notebook --generate-config&#xff0c;找到你的配置文件位置 修改一下路徑即可 一般情況到這一步就已經修改成功了&#xff…

arm-linux-gcc/ld/objcopy/objdump使用總結[zz]

地址&#xff1a;http://hi.baidu.com/xiaoyue1800/item/a11a2c4a26da4b04c11613d9arm-linux工具的功能如下&#xff1a;arm-linux-addr2line 把程序地址轉換為文件名和行號。在命令行中給它一個地址和一個可執行文件名&#xff0c;它就會使用這個可執行文件的調試信息指出在給…

圖像分割-LOG檢測器和DOG檢測器

邊緣檢測是以較小的算子為基礎的&#xff0c;具有兩個建議 1、灰度變化與圖像尺寸無關&#xff0c;因此檢測要求使用不同尺寸的算子。 2、灰度的突然變化會在一階導數產生波峰波谷&#xff0c;在二階導數產生零交叉 大的算子檢測模糊邊緣&#xff0c;小的算子檢測銳度集中的細節…

java const string_深入研究Java String

開始寫 Java 一年來&#xff0c;一直都是遇到什么問題再去解決&#xff0c;還沒有主動的深入的去學習過 Java 語言的特性和深入閱讀 JDK 的源碼。既然決定今后靠 Java吃飯&#xff0c;還是得花些心思在上面&#xff0c;放棄一些打游戲的時間&#xff0c;系統深入的去學習。Java…

python 示例_帶有示例的Python字典update()方法

python 示例字典update()方法 (Dictionary update() Method) update() method is used to update the dictionary by inserting new items to the dictionary. update()方法用于通過將新項目插入字典來更新字典。 Syntax: 句法&#xff1a; dictionary_name.setdefault(itera…

Rsync 使用指南

Rsync是個相當棒的同步工具&#xff0c;比如&#xff1a;1. 如何做本地兩個目錄之間的同步&#xff1f;rsync -av --delete --force ~/Desktop/Miscs/ /media/disk/DesktopMiscs 這樣就可以做~/Desktop/Miscs目錄的鏡像了。/media/disk是我的移動硬盤的掛載點。這里關鍵有個問題…

C++——統計多行單個字符類型個數

鍵盤輸入n個字符&#xff0c;請分別統計大寫字母、小寫字母、數字、其他字符的個數并輸出&#xff1b;還需要輸出所有數字字符之和 【輸入形式】 第一行為一個整數n(100 > n > 0)&#xff0c;接下來n行每行一個字符 【輸出形式】 輸出第1行為4個整數&#xff0c;分別…

安卓項目4

經歷兩天的琢磨&#xff0c;終于把android連接服務器端php&#xff0c;讀取mysql這一塊弄好了。 先說說這幾天遇到的問題。 http://wenku.baidu.com/view/87ca3bfa700abb68a982fbca.html 這是我參照的資料&#xff0c;原先我一度認為是不能實例化ServiceLink類&#xff0c;后來…

system getenv_Java System類getenv()方法及示例

system getenv系統類getenv()方法 (System class getenv() method) getenv() method is available in java.lang package. getenv()方法在java.lang包中可用。 getenv() method is used to return an unmodifiable Map of the current environment variable in key-value pairs…

用ASP獲取客戶端IP地址的方法

要想透過代理服務器取得客戶端的真實IP地址&#xff0c;就要使用 Request.ServerVariables("HTTP_X_FORWARDED_FOR") 來讀取。不過要注意的事&#xff0c;并不是每個代理服務器都能用 Request.ServerVariables("HTTP_X_FORWARDED_FOR") 來讀取客戶端的真實…

C++——已知a+b、 a+c、b+c、 a+b+c,求a、b、 c

有三個非負整數a、b、 C,現按隨機順序給出它們的兩兩和以及總和4個整數&#xff0c;即ab、 ac、bc、 abc, 注意,給出的4個數的順序是隨機的&#xff0c;請根據這四個數求出a、b、c是多少? [輸入形式] 輸入為一-行4個正整數, x1、 x2、x3、 x4 (0≤xi≤10^9) &#xff0c;表示…

DDD:DomainEvent、ApplicationEvent、Command

Command&#xff1a;縱向傳遞&#xff0c;跨分層&#xff0c;在控制器層和應用層之間傳遞。 DomainEvent&#xff1a;橫向傳遞&#xff0c;跨聚合&#xff0c;在一個DLL中。 ApplicationEvent&#xff1a;橫向傳遞&#xff0c;跨模塊&#xff0c;在不同的DLL中。轉載于:https:/…

表示和描述-邊界追蹤

邊界追蹤目標&#xff1a; 輸入&#xff1a;某一區域的點 輸出&#xff1a;這一區域的點的坐標序列&#xff08;順時針或逆時針&#xff09; Moore邊界追蹤法&#xff1a; 兩個前提條件&#xff1a; 1、圖像為二值化后的圖像&#xff08;目標為1&#xff0c;背景為0&#xff0…

視頻的讀取與處理

讀取本地視頻&#xff0c;以灰度視頻輸出 import cv2vc cv2.VideoCapture(E:\Jupyter_workspace\study\data/a.mp4)#視頻路徑根據實際情況而定#檢查是否打開正確 if vc.isOpened():open,fream vc.read()#read()返回兩個參數&#xff0c;第一個參數為打開成功與否True or Fal…

更靈活的定位內存地址的方法05 - 零基礎入門學習匯編語言36

第七章&#xff1a;更靈活的定位內存地址的方法05 讓編程改變世界 Change the world by program 問題7.8 [codesyntax lang"asm"] assume cs:codesg,ds:datasg datasg segment db ibm db dec db dos db vax …

nextgaussian_Java Random nextGaussian()方法與示例

nextgaussian隨機類nextGaussian()方法 (Random Class nextGaussian() method) nextGaussian() method is available in java.util package. nextGaussian()方法在java.util包中可用。 nextGaussian() method is used to generate the next pseudo-random Gaussian double valu…

Java PriorityQueue clear()方法與示例

PriorityQueue類clear()方法 (PriorityQueue Class clear() method) clear() method is available in java.util package. clear()方法在java.util包中可用。 clear() method is used to remove all the objects from this PriorityQueue. clear()方法用于從此PriorityQueue中刪…