協程并發等待技術——WaitGroup 類型和 errgroup 包
waitgroup?
阻塞等待多個并發任務執行完成。WaitGroup 類型主要包含下面幾個方法。
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
第一個是 Add 方法,在任務運行之前,需要調用 Add 方法,用于設置需要等待完成的任務數,Add 方法傳進去的數值之和,需要和任務數相等。
第二個是 Done 方法,每個任務完成時,需要調用 Done 方法,用于告知 WaitGroup 對象已經有一個任務運行完成。
第三個是 Wait 方法,當需要等待所有并發任務完成時,調用 Wait 方法,用于阻塞主協程。
?
import ("sync"
)var urls = []string{"http://www.golang.org/","http://www.google.com/","http://www.somestupidname.com/",
}func TestWaitGroup(t *testing.T) {// 創建WaitGroupwg := sync.WaitGroup{}results := make([]string, len(urls))for index, url := range urls {url := urlindex := index// 在創建協程執行任務之前,調用Add方法wg.Add(1)go func() {// 任務完成后,調用Done方法defer wg.Done()// Fetch the URL.resp, err := http.Get(url)if err != nil {return}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return}results[index] = string(body)}()}// 主協程阻塞,等待所有的任務執行完成wg.Wait()
}
errgroup 包
可以在主協程中獲取并發任務錯誤信息?
import ("golang.org/x/sync/errgroup"
)func TestErrHandle(t *testing.T) {results := make([]string, len(urls))// 創建Group類型g := new(errgroup.Group)for index, url := range urls {// Launch a goroutine to fetch the URL.url := urlindex := index// 調用Go方法g.Go(func() error {// Fetch the URL.resp, err := http.Get(url)if err != nil {return err // 返回錯誤}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return err // 返回錯誤}results[index] = string(body)return nil})}// Wait for all HTTP fetches to complete.// 等待所有任務執行完成,并對錯誤進行處理if err := g.Wait(); err != nil {fmt.Println("Failured fetched all URLs.")}
}
?第一步,我們要創建 Group 類型的對象。
第二步,在 Group 的 Go 方法中傳入那些需要并發運行的函數。特別需要注意的是,這些傳入的函數必須將錯誤返回。
第三步,也是最后一步,在主協程中,我們需要調用 Group 對象的 Wait 方法。通過這一調用,主協程將會阻塞等待,直至所有通過 Go 方法傳入的任務都執行完畢。并且,在任務完成后,我們還能夠對 Wait 方法所返回的錯誤進行處理。
func TestLimitGNum(t *testing.T) {results := make([]string, len(urls))// 用WithContext函數創建Group對象eg, ctx := errgroup.WithContext(context.Background())// 調用SetLimit方法,設置可同時運行的最大協程數eg.SetLimit(2)for index, url := range urls {url := urlindex := index// 調用Go方法eg.Go(func() error {select {case <-ctx.Done(): // select-done模式取消運行return errors.New("task is cancelled")default:// 并發獲取urlresp, err := http.Get(url)if err != nil {return err // 返回錯誤}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return err // 返回錯誤}results[index] = string(body)return nil}})}// 等待所有任務執行完成,并對錯誤進行處理if err := eg.Wait(); err != nil {fmt.Println("Failured fetched all URLs.")}
}
?errorGroup 包中的結構體
type token struct{}type Group struct {cancel func(error) // 這個作用是為了前面說的 WithContext 而來的wg sync.WaitGroup // errGroup底層的阻塞等待功能,就是通過WaitGroup實現的sem chan token // 用于控制最大運行的協程數err error // 最后在Wait方法中返回的errorerrOnce sync.Once // 用于安全的設置err
}
總結:
1. WaitGroup類型是Golang的基礎并發類型,用于阻塞等待多個并發任務執行完成,包含Add、Done和Wait方法。
2. errgroup包是Golang提供的并發擴展庫,對WaitGroup進行了封裝,在并發等待的基礎功能上提供了錯誤處理和任務取消功能。
3. Group類型的Go方法用于傳入具有錯誤返回值的函數類型,Wait方法會阻塞等待所有傳入Go方法的函數全部運行完畢,并且在任務完成后能夠對錯誤進行處理。
4. 任務取消功能通過WithContext函數創建Group對象,傳入Go方法的函數需要實現select-done模式,利用context來停止所有相關任務。
5. errgroup包還可以限制同時并發運行的最大協程數,通過SetLimit方法設置可同時運行的最大協程數,達到最大協程數時會阻塞創建新協程運行任務。