Go進階高并發處理教程
目錄
- Go并發編程基礎
- Goroutine深入理解
- 同步原語詳解
- 并發模式與最佳實踐
- 性能優化技巧
- 實戰案例
Go并發編程基礎
什么是并發?
并發是指程序能夠同時處理多個任務的能力。Go語言從設計之初就將并發作為核心特性,提供了簡潔而強大的并發編程模型。
Go并發模型的優勢
- 輕量級協程:Goroutine比傳統線程更輕量
- CSP模型:通過通信來共享內存,而不是通過共享內存來通信
- 內置調度器:Go運行時自動管理goroutine的調度
Goroutine深入理解
創建和啟動Goroutine
package mainimport ("fmt""time"
)func worker(id int) {fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {// 啟動多個goroutinefor i := 1; i <= 5; i++ {go worker(i)}// 等待所有goroutine完成time.Sleep(2 * time.Second)fmt.Println("All workers completed")
}
Goroutine的生命周期
- 創建:使用
go
關鍵字創建 - 調度:由Go調度器管理
- 執行:在可用的OS線程上執行
- 結束:函數返回時自動結束
調度器工作原理
Go使用M:N調度模型:
- M:OS線程(Machine)
- P:處理器(Processor)
- G:Goroutine
G1 G2 G3 G4\ | | /\ | | /\ | |/\| |P1 P2| |M1 M2
同步原語詳解
sync.WaitGroup
用于等待一組goroutine完成:
package mainimport ("fmt""sync""time"
)func worker(id int, wg *sync.WaitGroup) {defer wg.Done() // 完成時調用Done()fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {var wg sync.WaitGroupfor i := 1; i <= 5; i++ {wg.Add(1) // 增加等待計數go worker(i, &wg)}wg.Wait() // 等待所有goroutine完成fmt.Println("All workers completed")
}
sync.Mutex
互斥鎖用于保護共享資源:
package mainimport ("fmt""sync"
)type Counter struct {mu sync.Mutexvalue int
}func (c *Counter) Increment() {c.mu.Lock()defer c.mu.Unlock()c.value++
}func (c *Counter) Value() int {c.mu.Lock()defer c.mu.Unlock()return c.value
}func main() {counter := &Counter{}var wg sync.WaitGroup// 啟動100個goroutine同時增加計數器for i := 0; i < 100; i++ {wg.Add(1)go func() {defer wg.Done()for j := 0; j < 1000; j++ {counter.Increment()}}()}wg.Wait()fmt.Printf("Final counter value: %d\n", counter.Value())
}
sync.RWMutex
讀寫鎖允許多個讀操作同時進行:
type SafeMap struct {mu sync.RWMutexdata map[string]int
}func (sm *SafeMap) Get(key string) (int, bool) {sm.mu.RLock()defer sm.mu.RUnlock()val, ok := sm.data[key]return val, ok
}func (sm *SafeMap) Set(key string, value int) {sm.mu.Lock()defer sm.mu.Unlock()sm.data[key] = value
}
sync.Once
確保某個操作只執行一次:
package mainimport ("fmt""sync"
)var once sync.Once
var instance *Singletontype Singleton struct {data string
}func GetInstance() *Singleton {once.Do(func() {fmt.Println("Creating singleton instance")instance = &Singleton{data: "singleton"}})return instance
}func main() {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()s := GetInstance()fmt.Printf("Goroutine %d got instance: %s\n", id, s.data)}(i)}wg.Wait()
}
并發模式與最佳實踐
Worker Pool模式
package mainimport ("fmt""sync""time"
)type Job struct {ID intData string
}type Result struct {Job JobOutput string
}func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {fmt.Printf("Worker %d processing job %d\n", id, job.ID)time.Sleep(time.Millisecond * 100) // 模擬工作result := Result{Job: job,Output: fmt.Sprintf("Processed by worker %d", id),}results <- result}
}func main() {const numWorkers = 3const numJobs = 10jobs := make(chan Job, numJobs)results := make(chan Result, numJobs)var wg sync.WaitGroup// 啟動workerfor i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, jobs, results, &wg)}// 發送任務for i := 1; i <= numJobs; i++ {jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}}close(jobs)// 等待所有worker完成go func() {wg.Wait()close(results)}()// 收集結果for result := range results {fmt.Printf("Job %d result: %s\n", result.Job.ID, result.Output)}
}
扇入扇出模式
// 扇出:將工作分發給多個goroutine
func fanOut(input <-chan int, workers int) []<-chan int {outputs := make([]<-chan int, workers)for i := 0; i < workers; i++ {output := make(chan int)outputs[i] = outputgo func(out chan<- int) {defer close(out)for n := range input {out <- n * n // 計算平方}}(output)}return outputs
}// 扇入:將多個channel的結果合并
func fanIn(inputs ...<-chan int) <-chan int {output := make(chan int)var wg sync.WaitGroupfor _, input := range inputs {wg.Add(1)go func(in <-chan int) {defer wg.Done()for n := range in {output <- n}}(input)}go func() {wg.Wait()close(output)}()return output
}
性能優化技巧
1. 合理設置GOMAXPROCS
import "runtime"func init() {// 設置使用的CPU核心數runtime.GOMAXPROCS(runtime.NumCPU())
}
2. 避免goroutine泄漏
// 錯誤示例:可能導致goroutine泄漏
func badExample() {ch := make(chan int)go func() {ch <- 1 // 如果沒有接收者,這個goroutine會永遠阻塞}()// 函數返回,但goroutine仍在運行
}// 正確示例:使用context控制goroutine生命周期
func goodExample(ctx context.Context) {ch := make(chan int, 1) // 使用緩沖channelgo func() {select {case ch <- 1:case <-ctx.Done():return}}()
}
3. 使用對象池減少GC壓力
import "sync"var pool = sync.Pool{New: func() interface{} {return make([]byte, 1024)},
}func processData(data []byte) {buf := pool.Get().([]byte)defer pool.Put(buf)// 使用buf處理數據
}
實戰案例
并發HTTP客戶端
package mainimport ("fmt""net/http""sync""time"
)type Result struct {URL stringStatusCode intDuration time.DurationError error
}func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := Result{URL: url,Duration: duration,Error: err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org",}results := make(chan Result, len(urls))var wg sync.WaitGroup// 并發請求所有URLfor _, url := range urls {wg.Add(1)go fetchURL(url, results, &wg)}// 等待所有請求完成go func() {wg.Wait()close(results)}()// 處理結果for result := range results {if result.Error != nil {fmt.Printf("Error fetching %s: %v\n", result.URL, result.Error)} else {fmt.Printf("%s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}
總結
Go語言的并發編程提供了強大而簡潔的工具:
- Goroutine:輕量級協程,易于創建和管理
- Channel:類型安全的通信機制
- sync包:提供各種同步原語
- 并發模式:Worker Pool、扇入扇出等經典模式
掌握這些概念和技巧,能夠幫助您構建高性能、可擴展的并發應用程序。記住Go的并發哲學:通過通信來共享內存,而不是通過共享內存來通信。
參考資源
- Go官方文檔 - 并發
- Go并發模式
- Go內存模型