文章目錄
- 并發編程
- go協程
- chan通道
- 無緩沖通道
- 有緩沖通道
- 創建?緩沖和緩沖通道
- 等協程
- sync.WaitGroup同步
- Runtime包
- Gosched()
- Goexit()
- 區別
- 同步變量
- sync.Mutex互斥鎖
- atomic原子變量
- Select
- Ticker定時器
- 控制并發數量
- 核心機制
- 并發編程階段練習
- 重要的細節
- 端口掃描
- 股票監控
并發編程
go協程
chan通道
必要條件:
從通道取元素的時候要先關閉通道,程序才知道你不往通道輸入了,才能取出元素來進行操作,否則會由于通道沒有被關閉,
range 操作會一直等待新的數據到來,導致程序陷入死鎖狀態。
close關閉通道不是必要的,但是如果你不關閉,還一直讀取里面的東西的話, 你要保證你有源源不斷的內容進入到通道中不能讓他處于死鎖狀態。
//在go中的通道
var 變量名 = make(chan 類型)
var c = make(chan int)
//通道可以給很多數據,取出來的時候可以一個個的pop一樣,每次取一個就pop一個
c <- 123
c <- 456
close(c) //你要取出內容的時候要先關閉通道,程序才知道你不要往通道輸入了,要打印了。否則打印失敗,因為一直處于等待狀態會死鎖。
for v := range c{fmt.Println(v) //這里會依次打印123,456
}
個人理解:
0.go協程感覺可以理解為比線程更加細的,同時也更快的一種東西,它能夠實現多線程操作
(其實應該說多協程操作,就是能同時干很多事情)1.go的協程main運行結束前他能夠做搶占運行權的能力,但是一旦main運行結束他也就寄寄了。2.`通道(Channels)`:這個很重要,在協程中如果你沒有用緩沖通道,就代表你使用go協程的話,能夠進到協程里面運行的概率比較小,因為main一直搶占執行權,如果很快就執行完main的話就沒go協程啥事情了。3.緩沖區:當你在使用Channels的時候且使用?緩沖通道,如果造成了阻塞->那么就是你要取通道的東西,但是需要等通道有東西取的時候才能往下走程序。
可如果你make有緩沖通道的時候,就不會造成阻塞的狀態,前提是拿緩沖通道東西的時候一定要有東西,否則他會一直等待,就像上面的for循環,他就是一直等待導致了死鎖,所以報錯,不是因為他代碼錯誤哈!
那么同時,如果你緩沖區有東西,就算進入了go協程也會很有可能被main程序搶占,`但如果緩沖區為空,那么main在取出數據的時候會因沒有數據而等待某個協程給數據到通道然后取出來。`
對通道與協程之間的關系詳細解釋:(核心是能夠讓goroutine協程能夠互相通信)
Go 提供了?種稱為通道的機制,?于在 goroutine 之間共享數據。
Channels 通道可以共享內置、命名、結構和引?類型的值和 指針。
兩種類型的通道:?緩沖通道和緩沖通道。
?緩沖通道?于執?goroutine之間的同步通信,?緩沖通道?于執?異步通信。
?緩沖通道保證在發送和接收發?的瞬間執?兩個 goroutine 之間的交換。緩沖通道沒有這樣的保證。 協程之間?法直接通信,但可以使?通道來使協程之間互相通信
通道分為有緩沖和?緩沖,有緩沖,容量設置后,可以異步讀取數據,不會堵塞,?緩沖的話, 放?個數據就必須取出來,否則就會阻塞
無緩沖通道
無緩沖通道的特性
發送:發送操作會阻塞,直到有 Goroutine 接收該值。
接收:接收操作會阻塞,直到有 Goroutine 發送一個值。舉例子:
`假設你要在main中進行發送數據且讀取數據是行不通的`,一定要開一個go協程發送數據,這樣的話你的main就不會阻塞,因為go和main是互不干擾的,但是go協程可能執行權限輪不到他,main會搶占,所以這時候就能解釋為什么要用sleep來休眠,等待go協程去執行發送通道了。
(當然sleep這種方式只是演示,實際中不會用,有其他方法解決)
有緩沖通道
在帶緩沖的通道中,只要緩沖區未滿,發送不會阻塞
`只要緩沖區未空,接收不會阻塞`
創建?緩沖和緩沖通道
make(chan int) //整型?緩沖通道,這里就是沒有緩沖make(chan int ,1) //整型有緩沖通道,1也代表有一個緩沖//注意甄別:緩沖和通道是不一樣的,通道是通道,你緩沖是拿來緩沖的,懂吧hhh?
示例代碼:
package mainimport ("fmt""math/rand""time"
)var ch = make(chan int, 10)func Ion() {rand.Seed(time.Now().UnixNano())intn := rand.Intn(100)fmt.Println("隨機數為:", intn)ch <- intnch <- 2 //對比數據ch <- 55 //對比數據
}
func main() {defer close(ch) //main關閉后,關閉通道//ch <- 2如果這里注釋打開,就不會進入下面的go協程,因為主程序很快搶占回來執行結束,那么go的協程在go 的main運行完就無法運行了go Ion()fmt.Println("Wait...")data := <-chfmt.Println("值為:", data) //最終取出來的是隨機數fmt.Println("Done")
}
等協程
sync.WaitGroup同步
在上述中學到了,通道取出來數據是需要等待通道中有數據的,不然會阻塞,那么這當然是一種很好的同步方式,但是當我們go協程開啟后,與main運行不相干的時候,main是不管協程的。main:“搶得過我再說”。
三個核心
var wg sync.WaitGroup
wg.Add(1)
wg.Done() //這個函數里面其實是執行wg.Add(-1)
wg.Wait()
單單看上面三個函數其實就能理解了
當我們需要go協程與主函數同步執行的時候,可以用add添加,然后wait等待,那么怎么判斷是否需要繼續wait呢,那么就是用done來判斷了,如果一直減到0的時候就不用wait了
示例代碼:
//這樣的話就不用sleep函數了,sleep我們也不清楚要多少秒才能讓協程完成,所以肯定是需要這種同步機制來操作
func wq() { var wg sync.WaitGroup//比如你每次進入go協程之前都add一下for i := 0; i < 10; i++ {fmt.Println("進入go協程之前add一下")wg.Add(1)go func(n int) {fmt.Println("我是協程:", n)defer wg.Done() //通知一下go協程OK了,然后defer是為了防止中途報錯無法釋放。defer閉包釋放好一點。}(i)}//這里需要等待wait一下wg.Wait()fmt.Println("等待結束")
}
Runtime包
這個也可以讓go協程同步
Gosched()
這個函數是用來等待協程的,如果還有協程在執行,main就會讓出執行權利,實現同步
func runtime_go_Gosched() {fmt.Println("演示Gosched")go func() {fmt.Println("go協程執行中...")runtime.Goexit()fmt.Println("go協程還能執行嗎...")}()runtime.Gosched() //等待go協程執行完fmt.Println("演示Gosched結束")}
Goexit()
exit顧名思義退出,當你go協程執行的時候可以用這個退出,然后回到外層函數繼續執行
func runtime_go_Goexit() {fmt.Println("演示Goexit")go func() {fmt.Println("go協程執行中...")for i := 0; i < 10; i++ {if i > 7 {fmt.Println("go協程退出:", i)runtime.Goexit()}}}()runtime.Gosched() //等待go協程執行完fmt.Println("演示Goexit結束")
}
區別
sync.WaitGroup的作用一般是用于等待一組 goroutine 完成任務,done之后就不會阻塞了,即使你goroutine任務done完的代碼后面還有代碼任務,那么這時候goroutine就看情況搶占調度了,運氣不好或者你main也沒啥可執行的了就會直接結束,結束了你的go攜程里面就算有代碼可能搶不過main調度。runtime就是用于等待或結束一個goruntine的,可以用多個,每一個go攜程中都能夠
同步變量
sync.Mutex互斥鎖
不管是在以前學的線程還是go中的協程,開了多線程后都會面臨著幾個程序搶占同一個變量的事情,可能會產生死鎖問題。
兩個函數,加鎖和解鎖
var Lock sync.Mutex
Lock.Lock()
Lock.Unlock()
簡單的示例代碼,使用加減法隊同一個變量操作,你就會發現不會發生死鎖問題
這里采用了x=10,然后循環加減法50次,你會發現基本不會發生一個加分或者減法一直執行的事情發生。
func Mutex_lock() {var (x int = 10wg sync.WaitGroupLock sync.Mutex)var Add = func() {defer wg.Done()Lock.Lock()x += 1fmt.Println("x++:", x)Lock.Unlock()}var Sub = func() {defer wg.Done()Lock.Lock()x -= 1fmt.Println("x--:", x)Lock.Unlock()}for i := 0; i < 50; i++ { //循環50次wg.Add(1)go Add()wg.Add(1)go Sub()}wg.Wait()fmt.Println("結束", x)
}
atomic原子變量
除了Mutex互斥鎖能夠實現變量的同步操作外,go中還提供了原子變量
因為原子操作不能被中斷,所以它需要足夠簡單,并且要求快速。因此,操作系統層面只對二進制位或整數的原子操作提供了支持。
`原子操作只支持有限的數據類型`,所以多數場景下,往往互斥鎖更合適。
這里用同樣的一個操作,加減法來實現一個變量的同步。
簡單了解一下幾個函數(以int類型為例子,其他類型一樣的)
//CAS 是 Compare-And-Swap(?較并交換)的縮寫
//作用是:?較內存中的某個值是否等于預期值,如果相等則將其更新為新值
atomic.CompareAndSwapInt32(&i, 100, 200)//Load(讀)
//讀取過程中相當于我們Metux中的加鎖解鎖操作,不會發生死鎖、變量值不同步問題
atomic.LoadInt32(&i)//Store(寫)
atomic.StoreInt32(&i, 200)//高版本的go可以直接使用:
atomic.Load(var_p)
atomic.Store(var_p)
示例代碼:
func myAtomic() {//實現域Metux一樣的操作var (x int32 = 10wg sync.WaitGroup)var Add = func() {wg.Done()atomic.AddInt32(&x, 3)fmt.Println("Add 1")}var Sub = func() {wg.Done()atomic.AddInt32(&x, -2)fmt.Println("Sub 1")}for i := 0; i < 5; i++ {wg.Add(1)go Add()wg.Add(1)go Sub()}wg.Wait()fmt.Println("結束", x)
}
Select
是Go中的?個控制結構,類似于switch語句,select會監聽case語句中channel的讀寫操作,當case可讀寫的時候,就執行case,進行讀寫操作以及case里面寫的語句,當然select是隨機選擇的
。
default是當case當中都沒有可讀寫的channel了,就會執行default中的語句
注意事項:
select中的case語句必須是?個channel操作
多個case都,select會隨機選出?個執?
`沒有可運?的case語句,且沒有default語句,select將會阻塞,直到某個case通信可以運?`
需求:
創建不同類型的chan通道,寫入數據后通過case對他進行分類別讀取。
示例代碼理解最快:
func select_case() {var (wg sync.WaitGroupd_int = make(chan int)d_string = make(chan string)d_float64 = make(chan float64))go func() {wg.Add(1) //進入一個go協程+1d_int <- 123d_int <- 456d_string <- "string1"d_string <- "string2"d_float64 <- 123.456//養成好習慣,如果你不用通道了及時關閉,否則后面遍歷處理數據的時候就會出錯close(d_int)close(d_string)close(d_float64)}()wg.Wait() //等待for {select {case r := <-d_int:fmt.Println("int:", r)case r := <-d_string:fmt.Println("string:", r)case r := <-d_float64:fmt.Println("float64:", r)default:fmt.Println("default")}time.Sleep(1 * time.Second)}
}
Ticker定時器
創建定時器后,會向定時器的管道中按照指定的周期時間,定期寫入事件,所以當你在周期到了的時候讀取數據就不會阻塞,你讀取完之后,還沒到周期時間執行就會一直阻塞。
go的定時器記住三個函數
time.NewTicker(time.Second * 1) //創建一個周期性定時器
ticker.C //ticker通道,Ticker對外僅暴露一個channel
ticker.Stop() //停止ticker
示例代碼
func myTicker() {ticker := time.NewTicker(1 * time.Second)count := 1for _ = range ticker.C {fmt.Println("執行了:", count)count++if count >= 10 {ticker.Stop()break}}
}
控制并發數量
核心機制
這里只是教你如何控制并發數量的例子,并不一定要按照我的方法來。
sem <- data
的作用sem
是一個帶緩沖的通道,緩沖區大小為workers
,即sem := make(chan 類型, workers)
。- 當通道的緩沖區已滿時,發送操作(
sem <-
)會阻塞,等待通道有空間釋放后再繼續。 - 如果通道沒有滿,發送會立即成功。
<-sem
的作用defer func() { <-sem }()
會在協程完成任務后,從通道中取出一個值,釋放空間。- 釋放的空間讓之前阻塞的發送操作可以繼續執行,從而不會導致程序崩潰或死鎖。
//這里是端口掃描的代碼,提前搬上來了解一下控制并發數量
func start_WaitGroup_scan_port() {var (wg sync.WaitGroupch = make(chan int, 1024) // 增加緩沖區,減少阻塞count intworkers = 100 // 控制并發數)var scanPort = func(hostname string, port int) {defer wg.Done()address := fmt.Sprintf("%s:%d", hostname, port)conn, err := net.DialTimeout("tcp", address, 2*time.Second)if err == nil {conn.Close()ch <- port}}// 控制并發數,很關鍵的理解sem := make(chan int, workers)for i := 0; i < 65536; i++ {wg.Add(1)sem <- 1go func(port int) {defer func() { <-sem }()scanPort("127.0.0.1", port)}(i)}go func() {wg.Wait()close(ch)}()for port := range ch {fmt.Printf("open: %d\n", port)count++}fmt.Printf("------------Open ports: %d-------------\n", count)fmt.Println("------------Scan——Done------------")
}
并發編程階段練習
重要的細節
wg.Wait()
移到一個匿名協程中處理,并在 wg.Wait()
之后關閉通道,保證了以下兩點:
- 通道關閉時,沒有協程向通道寫數據
- 主協程從通道
rep
中讀取數據,不負責關閉通道。 - 匿名協程負責等待所有任務完成并關閉通道,確保通道關閉與寫入互相隔離。
- 主協程從通道
- 避免主協程被搶占的死鎖風險
- 如果你直接在主協程中調用
wg.Wait()
和close(rep)
,主協程在執行時可能與其他協程搶占資源,導致協程無法按時完成。 - 匿名協程中執行
wg.Wait()
獨立于主協程,不受讀取數據或其他邏輯的影響。
- 如果你直接在主協程中調用
可以看到下面中
go func() {wg.Wait()close(rep)
}()
這里原本我是直接放在主函數使用,開一個匿名協程去執行的,但是gpt解釋了一下
就是:不能夠在主函數中直接等待與關閉,而是需要在go協程中進行,因為直接在主函數wait后就close的話會導致go協程中還有正在執行的,這就很矛盾了,我們的wait命名是等待,但是不要忘記了他其實是Add函數去判斷是否還需要等待,如果你提前Done了,主函數會立即搶占,那么這時候你的go協程其實還有很多代碼沒有執行完成就被搶占回去了,然后后面就是你要進行close,那么后續的go協程write相關操作就會報錯了
func start_WaitGroup_scan_port() {var (wg sync.WaitGroupcount int = 0rep = make(chan bool))var WaitGroup_scan_port = func(hostname string, port int) {address := fmt.Sprintf("%s:%d", hostname, port) //ip端口conn, err := net.DialTimeout("tcp", address, 2*time.Second) //2秒if err != nil {wg.Done()rep <- false} else {conn.Close()fmt.Printf("open: %d\n", port)wg.Done()rep <- true}}for i := 0; i <= 65535; i++ {wg.Add(1)go WaitGroup_scan_port("127.0.0.1", i)}go func() {wg.Wait()close(rep)}()for v := range rep {if v {count++}}fmt.Printf("------------Open ports: %d-------------\n", count)fmt.Println("------------Scan——Done------------")
}
端口掃描
1.使用WaitGroup進行并發,需要控制并發數量,可能會因為資源不夠而導致結果不準確、不完整。
func start_WaitGroup_scan_port() {var (wg sync.WaitGroupch = make(chan int, 1024) // 增加緩沖區,減少阻塞count intworkers = 100 // 控制并發數)var scanPort = func(hostname string, port int) {defer wg.Done()address := fmt.Sprintf("%s:%d", hostname, port)conn, err := net.DialTimeout("tcp", address, 2*time.Second)if err == nil {conn.Close()ch <- port}}// 控制并發數sem := make(chan int, workers)for i := 0; i < 65536; i++ {wg.Add(1)sem <- 1go func(port int) {defer func() { <-sem }()scanPort("127.0.0.1", port)}(i)}go func() {wg.Wait()close(ch)}()for port := range ch {fmt.Printf("open: %d\n", port)count++}fmt.Printf("------------Open ports: %d-------------\n", count)fmt.Println("------------Scan——Done------------")
}
2.使用通道,即單協程進行端口掃描
func start_Ch_scan_port() {var (ch = make(chan int)count int = 0)var Ch_scan_port = func(hostname string) {defer close(ch)for i := 0; i < 65535; i++ {//掃描到開放端口放到ch通道中即可,主函數一直等待讀取通道信息address := fmt.Sprintf("%s:%d", hostname, i) //ip端口conn, err := net.DialTimeout("tcp", address, 2*time.Second) //2秒if err == nil {conn.Close()ch <- i}}runtime.Goexit()}go Ch_scan_port("127.0.0.1")runtime.Gosched() //等待go協程執行完for i := range ch {fmt.Printf("open: %d\n", i)count++}fmt.Printf("------------Open ports: %d-------------\n", count)fmt.Println("------------Scan——Done------------")
}
股票監控
這一題中能夠領悟到挺多東西。
1.管道的讀取數據,一般是不同協程之間進行讀寫操作,所以我們需要對讀或者寫分開協程操作
2.go協程之間需要不斷對一個數據進行操作的話,一般要開一個通道然后通過對通道進行讀寫操作
最終效果
示例代碼:
package mainimport ("fmt""math/rand""sync/atomic""time"
)var price atomic.Value //股票價格
var priceCh = make(chan float64) //股票通道,讀寫之間的一個通道
var stopCh = make(chan bool) //開關監控
// 模擬股票漲跌(隨機)
func randGP() float64 {return rand.Float64()*10 + 100 //控制在110之間
}// ticker 定時獲取股票價格。模擬監控股票,且變更十次
func myTicker() {ticker := time.NewTicker(time.Second * 1)for {select {case <-ticker.C:priceCh <- randGP()case <-stopCh:fmt.Println("結束監控。")return}}
}
func update_g() {for i := range priceCh {price.Store(i)fmt.Printf("股票更新為:%.2f\n", i)}
}
func main() {go myTicker()go update_g()time.Sleep(5 * time.Second) // 監控5秒close(stopCh) // 結束監控,close后stopCh會等于0,case能夠讀取//獲取最終股票價格fmt.Printf("最終股票價格為:%.2f\n", price.Load())
}