Go語言channel與select原理

本文會嘗試解釋 go runtime 中 channel 和 select 的具體實現,部分內容來自 gophercon2017。Go版本為1.8.3

channel

第一部分講述一下 channel 的用法。channel 可以看做一個隊列,用于多個goroutine之間的通信,例如下面的例子,一個goroutine發送msg,另一個msg接受消息。channel 分為帶緩沖和不帶緩沖,差別不是很大,具體請自行google。看一個簡單的例子,了解一下channel的使用。

package mainimport "fmt"func main() {// Create a new channel with `make(chan val-type)`.// Channels are typed by the values they convey.messages := make(chan string)// Send a value into a channel using the `channel <-`// syntax. Here we send `"ping"`  to the `messages`// channel we made above, from a new goroutine.go func() { messages <- "ping" }()// The `<-channel` syntax receives a value from the// channel. Here we'll receive the `"ping"` message// we sent above and print it out.msg := <-messagesfmt.Println(msg)
}

channel的功能點:

  1. 隊列
  2. 阻塞
  3. 當一端阻塞,可以被另一個端喚醒

我們圍繞這3點功能展開,講講具體的實現。

channel結構

注釋標注了幾個重要的變量,從功能上大致可以分為兩個功能單元,一個是 ring buffer,用于存數據; 一個是存放 goroutine 的隊列。

type hchan struct {qcount   uint           // 當前隊列中的元素個數dataqsiz uint           // 緩沖隊列的固定大小buf      unsafe.Pointer // 緩沖數組elemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // 下一次發送的 indexrecvx    uint   // 下一次接收的 indexrecvq    waitq  // 接受者隊列sendq    waitq  // 發送者隊列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}

Ring Buffer

主要是以下變量組成的功能, 一個 buf 存儲實際數據,兩個指針分別代表發送,接收的索引位置,配合 size, count 在數組大小范圍內來回滑動。

qcount   uint           // 當前隊列中的元素個數
dataqsiz uint           // 緩沖隊列的固定大小
buf      unsafe.Pointer // 緩沖數組
sendx    uint   // 下一次發送的 index
recvx    uint   // 下一次接收的 index

舉個例子,假設我們初始化了一個帶緩沖的channel, ch := make(chan int, 3), 那么它初始狀態的值為:

qcount   = 0
dataqsiz = 3
buf      = [3]int{0, 0, 0} // 表示長度為3的數組
sendx    = 0
recvx    = 0

第一步,向 channel 里 send 一個值, ch <- 1, 因為現在緩沖還沒滿,所以操作后狀態如下:

qcount   = 1
dataqsiz = 3
buf      = [3]int{1, 0, 0} // 表示長度為3的數組
sendx    = 1
recvx    = 0

快進兩部,連續向 channel 里 send 兩個值 (2, 3),狀態如下:

qcount   = 3
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示長度為3的數組
sendx    = 0 // 下一個發送的 index 回到了0
recvx    = 0

從 channel 中 receive 一個值, <- ch, 狀態如下:

qcount   = 2
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示長度為3的數組
sendx    = 0 // 下一個發送的 index 回到了0
recvx    = 1 // 下一個接收的 index

阻塞

我們看下,如果 receive channel 時,channel 的 buffer中沒有數據是怎么處理的。邏輯在 chanrecv 這個方法中,它的大致流程如下,僅保留了阻塞操作的代碼。

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 檢查 channdel 是否為 nil// 當不阻塞時,檢查buffer大小,當前大小,檢查chennel是否關閉,看看是否能直接返回// 檢查發送端是否有等待的goroutine,下部分會提到// 當前buffer中有數據,則嘗試取出。// 如果非阻塞,直接返回// 沒有sender等待,buffer中沒有數據,則阻塞等待。gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)//關鍵操作:設置 goroutine 狀態為 waiting, 把 G 和 M 分離goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us up// 被喚醒,清理 sudogif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

這里的操作就是 創建一個 當前 goroutine 的 sudog, 然后把這個 sudog 放入 channel 的接受者等待隊列;設置當前 G 的狀態,和 M分離,到這里當前G就阻塞了,代碼不會執行下去。
當被喚醒后,執行sudog的清理操作。這里接受buffer中的值的指針是 ep 這個變量,被喚醒后好像沒有向 ep 中賦值的操作。這個我們下部分會講。

sudog

還剩最后一個疑問,當一個goroutine因為channel阻塞,另一個goroutine是如何喚醒它的。

channel 中有兩個 waitq 類型的變量, 看下結構發現,就是sudog的鏈表,關鍵是 sudog。sudog中包含了goroutine的引用,注意一下 elem這個變量,注釋說可能會指向stack。

type waitq struct {first *sudoglast  *sudog
}type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this.g          *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next       *sudogprev       *sudogelem       unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// waitlink is only accessed by g.acquiretime int64releasetime int64ticket      uint32waitlink    *sudog // g.waiting listc           *hchan // channel
}

講阻塞部分的時候,我們看到goroutine被調度之前,有一個 enqueue操作,這時,當前G的sudog已經被存入recvq中,我們看下發送者這時的操作。

這里的操作是,sender發送的值 直接被拷貝到 sudog.elem 了。然后喚醒 sudog.g ,這樣對面的receiver goroutine 就被喚醒了。具體請下面的注釋。

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 檢查工作// 如果能從 chennel 的 recvq 彈出 sudog, 那么直接sendif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) })return true}// buffer有空余空間,返回; 阻塞操作
}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {// 處理 index// 關鍵if sg.elem != nil {// 這里是根據 elemtype.size 復制內存sendDirect(c.elemtype, sg, ep)sg.elem = nil}// 一些處理// 重新設置 goroutine 的狀態,喚醒它goready(gp, 4)
}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}// memmove copies n bytes from "from" to "to".
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)

select

在看 chanrecv()方法 時,發現了一個 block 參數,代表操作是否阻塞。一般情況下,channel 都是阻塞的(不考慮buffer),那什么時候非阻塞呢?

第一個想到的就是 select, 在寫了default case的時候,其他的channel是非阻塞的。

還有一個可能不常用,就是 channel 的反射 value, 可以是非阻塞的,這個方法是public的,我們先看下簡單的。

func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool

select 就復雜一點點,首先在源碼中發現一段注釋:

// compiler implements
//
//    select {
//    case c <- v:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbsend(c, v) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}// compiler implements
//
//    select {
//    case v = <-c:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbrecv(&v, c) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {selected, _ = chanrecv(t, c, elem, false)return
}

如果是一個 case + default 的模式,那么編譯器就調用以上方法來實現。

如果是多個 case + default 的模式呢?select 在runtime到底是如何執行的?寫個簡單的select編譯一下。

package mainfunc main() {var ch chan intselect {case <-ch:case ch <- 1:default:}
}

go tool compile -S -l -N test.go > test.s 結果中找一下關鍵字,例如:

0x008c 00140 (test.go:5)    CALL    runtime.newselect(SB)
0x00ad 00173 (test.go:6)    CALL    runtime.selectrecv(SB)
0x00ec 00236 (test.go:7)    CALL    runtime.selectsend(SB)
0x0107 00263 (test.go:8)    CALL    runtime.selectdefault(SB)
0x0122 00290 (test.go:5)    CALL    runtime.selectgo(SB)

這里 selectgo 是實際運行的方法,找一下,注意注釋。先檢查channel是否能操作,如果不能操作,就走 default 邏輯。

loop:// pass 1 - look for something already waitingvar dfl *scasevar cas *scasefor i := 0; i < int(sel.ncase); i++ {cas = &scases[pollorder[i]]c = cas.cswitch cas.kind {// 接受數據case caseRecv:sg = c.sendq.dequeue()// 如果有 sender 在等待if sg != nil {goto recv}// 當前buffer中有數據if c.qcount > 0 {goto bufrecv}// 關閉的channelif c.closed != 0 {goto rclose}case caseSend:if raceenabled {racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)}// 關閉if c.closed != 0 {goto sclose}// 有 receiver 正在等待sg = c.recvq.dequeue()if sg != nil {goto send}// 有空間接受if c.qcount < c.dataqsiz {goto bufsend}// 走defaultcase caseDefault:dfl = cas}}if dfl != nil {selunlock(scases, lockorder)cas = dflgoto retc}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/455285.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/455285.shtml
英文地址,請注明出處:http://en.pswp.cn/news/455285.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Xadmin添加用戶小組件出錯

環境&#xff1a; Python 3.5.6 Django 2.1 Xadmin 原因&#xff1a; render函數在django2.1上有變化 解決方案&#xff1a; 1.在Python終端輸入命令help(xadmin) 查看xadmin安裝位置 得到如下輸出 FILE/root/anaconda3/envs/learndjango/lib/python3.5/site-packages/xad…

成本預算的四個步驟_全網推廣步驟有哪些?

全網推廣的步驟是什么&#xff1f;一般來說&#xff0c;搜索引擎優化是大多數中小企業常用的推廣方法。主要是通過對一些搜索引擎的排名來提高網站的曝光率&#xff0c;從而更好的提高自己網站的流量&#xff0c;從而更好的實現互聯網層面的銷售。接下來&#xff0c;讓我們學習…

undefined reference to `std::cout'等錯誤

&#xff08;1&#xff09;gcc和g都是GNU(組織)的一個編譯器。 &#xff08;2&#xff09;后綴名為.c的程序和.cpp的程序g都會當成是c的源程序來處理。而gcc不然&#xff0c;gcc會把.c的程序處理成c程序。 &#xff08;3&#xff09;對于.cpp的程序&#xff0c;編譯可以用gcc/g…

FFPLAY的原理(二)

關于包Packets的注釋從技術上講一個包可以包含部分或者其它的數據&#xff0c;但是ffmpeg的解釋器保證了我們得到的包Packets包含的要么是完整的要么是多種完整的幀。現在我們需要做的是讓SaveFrame函數能把RGB信息定稿到一個PPM格式的文件中。我們將生成一個簡單的PPM格式文件…

python生成requirements.txt的兩種方法

python項目如何在另一個環境上重新構建項目所需要的運行環境依賴包&#xff1f; 使用的時候邊記載是個很麻煩的事情&#xff0c;總會出現遺漏的包的問題&#xff0c;這個時候手動安裝也很麻煩&#xff0c;不能確定代碼報錯的需要安裝的包是什么版本。這些問題&#xff0c;requi…

node.js 安裝使用http-server

node.js npm全局安裝了http-server后我該怎么使用它&#xff1f;我在它的安裝目錄下創建了inde.html&#xff0c;瀏覽器localhost:8080可以訪問&#xff0c;那我的項目需要放在它的安裝目錄下&#xff1f;還是需要在我的項目下配置什么或者使用什么指令啟動它&#xff1f;我在我…

D - 卿學姐與魔法

卿學姐與魔法 Time Limit: 1200/800MS (Java/Others) Memory Limit: 65535/65535KB (Java/Others) Submit Status“你的膜法也救不了你 在去拯救公主的道路上&#xff0c;卿學姐披荊斬棘&#xff0c;刀刃早已銹跡斑斑。 一日卿學姐正在為武器的問題發愁&#xff0c;碰到了正…

python對excel表統計視頻教程_Python實現對excel文件列表值進行統計的方法

本文實例講述了Python實現對excel文件列表值進行統計的方法。分享給大家供大家參考。具體如下&#xff1a;#!/usr/bin/env python#codinggbk#此PY用來統計一個execl文件中的特定一列的值的分類import win32com.clientfilenameraw_input("請輸入要統計文件的詳細地址&#…

mooc后臺管理系統設計

摘 要 本設計采用Python中的Django框架實現Mooc后臺管理界面設計,django是一個完整的開源web開源框架,使用起來能夠快速的搭建你想要的網站,由于django自帶后臺管理系統,本設計中后臺管理模板采用功能更加強大的Xadmin實現。數據庫部分采用mysql5.7,由于django中有自帶封裝的數…

DirectShow系統初級指南

流媒體的處理&#xff0c;以其復雜性和技術性&#xff0c;一向廣受工業界的關注。特別伴隨著因特網的普及&#xff0c;流媒體在網絡上的廣泛應用&#xff0c;怎樣使流媒體的處理變得簡單而富有成效逐漸成為了焦點問題。選擇一種合適的應用方案&#xff0c;事半功倍。此時&#…

正則正整數含0

^0?$|^([1-9][0-9]*)?$

MySQL 數據庫導出導入操作

有時需要將 MySQL 數據庫中的數據導入到其它的數據庫中&#xff0c;這里以從 Ubuntu 系統的 MySQL 數據庫導出 zabbix 這個數據庫到 Windows 系統中的MySQL 為例。 導出數據庫 導出數據其實非常方便&#xff0c;比如將 MySQL 中的 zabbix 這個數據庫導出到當前文件夾&#xff…

您的apple id 暫時不符合使用此應用程序_Mac相機不工作時該怎么辦

蘋果公司的許多臺式機和筆記本電腦都包含一個內置網絡攝像頭&#xff0c;該公司愉快地將其稱為FaceTime相機。但是&#xff0c;如果您的Mac網絡攝像頭無法正常工作&#xff0c;并且在嘗試訪問它時顯示為斷開連接或不可用&#xff0c;則您可能不會感到高興。您可以嘗試以下操作來…

基于DirectShow的流媒體解碼和回放

一、 前言  流媒體的定義很廣泛&#xff0c;大多數時候指的是把連續的影像和聲音信息經過壓縮處理后放上網站服務器&#xff0c;讓用戶一邊下載一邊觀看、收聽&#xff0c;而不需要等整個壓縮文件下載到自己機器就可以觀看的視頻/音頻傳輸、壓縮技術。流媒體也指代由這種技術…

《知易行難》擴展練習

在學習了《知易行難》后&#xff0c;這個是一個選做的擴展練習&#xff0c;但是里面的問題真的的很好&#xff0c;所以我也將在這里真實的分享&#xff0c;但是有些敏感的人名我就隱去了。 1. 這一年你做了些什么事情&#xff1f; 1&#xff09;團隊的整合&#xff0c;將團隊…

python 裁判文書網_python - 用selenium模擬登陸裁判文書網,系統報錯找不到元素。...

問 題from selenium import webdriverfrom selenium.webdriver.common.desired_capabilities import DesiredCapabilitiesdcap dict(DesiredCapabilities.PHANTOMJS)dcap["phantomjs.page.settings.userAgent"]("Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWeb…

Python 四大主流 Web 編程框架

目前Python的網絡編程框架已經多達幾十個&#xff0c;逐個學習它們顯然不現實。但這些框架在系統架構和運行環境中有很多共通之處&#xff0c;本文帶領讀者學習基于Python網絡框架開發的常用知識,及目前的4種主流Python網絡框架&#xff1a;Django、Tornado、Flask、Twisted。 …

汕頭市隊賽 SRM16 T2

描述 貓和老鼠&#xff0c;看過吧&#xff1f;貓來了&#xff0c;老鼠要躲進洞里。在一條數軸上&#xff0c;一共有n個洞&#xff0c;位置分別在xi&#xff0c;能容納vi只老鼠。一共有m只老鼠位置分別在Xi&#xff0c;要躲進洞里&#xff0c;問所有老鼠跑進洞里的距離總和最小是…

基于django和vue的xdh官網設計

前言 本項目是使用三段分離的設計 前臺 使用materialize框架搭建的前臺頁面,后端使用的django寫的接口 后臺 使用Amazon UI 模板搭建的界面,管理各個部分的內容 項目環境 python3.7.2 django2.2.9 vue axios jQuery materialize mysql摘 要 本設計采用前后端分離的設計…

C#調用WebService實例和開發(轉)

http://www.cnblogs.com/peterpc/p/4628441.html 一、基本概念 Web Service也叫XML Web Service WebService是一種可以接收從Internet或者Intranet上的其它系統中傳遞過來的請求&#xff0c;輕量級的獨立的通訊技術。是:通過SOAP在Web上提供的軟件服務&#xff0c;使用WSDL文件…