實現了
- go多協程壓力測試
- 實現了Monitor,異步統計qps、時延、cpu(client端)等指標,周期printStat。只需要把單條執行func傳給Monitor即可
- 命令行傳參
- ctrl+c之后正常退出
- (mock cpu 占用)
代碼見 https://gitee.com/bbjg001/golearning/tree/master/others/stress_monitor
執行
go run *.go -action w -routine 5 -duration 30s
效果
更多的
可以把收集到的指標轉給/metrics
接口,可以實現外部手機指標,提供給prometheus收集,接口grafana實現可視化監控展示
附
util.go
package mainimport ("context""fmt""math/rand""os""sync/atomic""time""github.com/rcrowley/go-metrics""github.com/shirou/gopsutil/v3/process"
)type SCMonitor struct {registry metrics.Registryinterval time.DurationstopChan chan struct{}qpsCounter metrics.CounterlatencyTimer metrics.TimertotalRequests uint64failedRequests uint64lastCompute time.Time
}func NewSCMonitor(interval time.Duration) *SCMonitor {registry := metrics.NewRegistry()return &SCMonitor{registry: registry,interval: interval,stopChan: make(chan struct{}),qpsCounter: metrics.NewCounter(),latencyTimer: metrics.NewTimer(),}
}func (m *SCMonitor) Start() {// 注冊指標m.registry.Register("requests.qps", m.qpsCounter)m.registry.Register("requests.latency", m.latencyTimer)// CPU監控cpuGauge := metrics.NewGauge()m.registry.Register("system.cpu", cpuGauge)m.lastCompute = time.Now()go func() {p, err := process.NewProcess(int32(os.Getpid()))if err != nil {fmt.Printf("Failed to init process monitor: %v\n", err)return}ticker := time.NewTicker(m.interval)defer ticker.Stop()// fmt.Printf("time\tqps\tcpu\tlatency999\tlatency99\tlatency9\tlatencyMean\n")fmt.Printf("%-12s%-30s%-10s%-16s%-16s%-16s%-16s\n","time", "qps", "cpu", "latency999", "latency99", "latency9", "latencyMean")for {select {case <-m.stopChan:returncase <-ticker.C:// 更新CPU指標if percent, err := p.Percent(0); err == nil {cpuGauge.Update(int64(percent * 100))}// 打印周期報告m.PrintReport()}}}()
}func (m *SCMonitor) Monitor(ctx context.Context, work func() (duration time.Duration, succeed bool)) {for {select {case <-ctx.Done():returndefault:duration, succeed := work()m.RecordRequest(duration, succeed)}}
}func (m *SCMonitor) RecordRequest(latency time.Duration, succeed bool) {atomic.AddUint64(&m.totalRequests, 1)if !succeed {atomic.AddUint64(&m.failedRequests, 1)}m.qpsCounter.Inc(1)m.latencyTimer.Update(latency)
}func (m *SCMonitor) PrintReport() {now := time.Now().Format("15:04:05")// fmt.Printf("\n=== Metrics Report @ %s ===\n", now)fmt.Printf("%-12s", now)// QPS計算if qps := m.registry.Get("requests.qps"); qps != nil {counter := qps.(metrics.Counter)interval := time.Since(m.lastCompute).Milliseconds()rate := float64(counter.Count()*1000) / float64(interval)qpsStr := fmt.Sprintf("%.1f (Total: %d)", rate, atomic.LoadUint64(&m.totalRequests))fmt.Printf("%-30s", qpsStr)counter.Clear()}m.lastCompute = time.Now()// CPU使用率if cpu := m.registry.Get("system.cpu"); cpu != nil {cpuStr := fmt.Sprintf("%.1f%%\t", float64(cpu.(metrics.Gauge).Value())/100)fmt.Printf("%-10s", cpuStr)}// 時延統計if timer := m.registry.Get("requests.latency"); timer != nil {t := timer.(metrics.Timer)fmt.Printf("%-16.2f%-16.2f%-16.2f%-16.2f\n", t.Percentile(0.999)/1000000, t.Percentile(0.99)/1000000,t.Percentile(0.9)/1000000, t.Mean()/1000000)}
}func (m *SCMonitor) Stop() {close(m.stopChan)m.PrintReport() // 最終報告
}// 模擬cpu負載
func mockCpuLoad(loadPercentage int, durationSecond int) {workTime := time.Duration(loadPercentage) * time.MillisecondsleepTime := time.Duration(100-loadPercentage) * time.Millisecondctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(durationSecond))defer cancel()for {select {case <-ctx.Done():returndefault:start := time.Now()// 在工作時間內進行計算for time.Since(start) < workTime {// 執行一些計算密集型操作_ = rand.Intn(1000) * rand.Intn(1000)}time.Sleep(sleepTime)}}
}
main.go
package mainimport ("context""flag""fmt""math/rand""os""os/signal""syscall""time"
)var (testCfg = TConfig{}values = make([]string, 1000)baseVal string
)type TConfig struct {routine intaction stringduration time.Duration
}func (c TConfig) String() string {return "=============測試參數\n" +fmt.Sprintf("routine: %d\n", c.routine) +fmt.Sprintf("action: %s\n", c.action) +fmt.Sprintf("duration: %v\n", c.duration)
}func init() {parseFlags()
}func parseFlags() {flag.IntVar(&testCfg.routine, "routine", 1, "線程數")flag.StringVar(&testCfg.action, "action", "r", "action, w|r")flag.DurationVar(&testCfg.duration, "duration", time.Minute, "測試時長")flag.Parse()
}func doWrite(uid int64) (time.Duration, bool) {// 此處uit只做傳參示例d := int(rand.Float64() * 10000)startTime := time.Now()time.Sleep(time.Microsecond * time.Duration(d))succeed := rand.Float64() < 0.002return time.Since(startTime), succeed
}func doRead(uid int64) (time.Duration, bool) {d := int(rand.Float64() * 5000)startTime := time.Now()time.Sleep(time.Microsecond * time.Duration(d))succeed := rand.Float64() < 0.0001return time.Since(startTime), succeed
}func main() {fmt.Println(testCfg.String())fmt.Println("可Ctrl+C退出")go mockCpuLoad(15, 12) // 模擬cpu負載start := time.Now()ctx, cancel := context.WithTimeout(context.Background(), testCfg.duration)defer cancel()signalCh := make(chan os.Signal, 1)signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)monitor := NewSCMonitor(3 * time.Second)monitor.Start()defer monitor.Stop()for i := 0; i < testCfg.routine; i++ {if testCfg.action == "w" {go monitor.Monitor(ctx, func() (duration time.Duration, succeed bool) { // 只需要傳入一個控制測試停止的Context和M一個單條條執行的func即可,其他的的交給Monitorreturn doWrite(int64(i))})} else if testCfg.action == "r" {go monitor.Monitor(ctx, func() (duration time.Duration, succeed bool) {return doRead(int64(i))})} else {panic(fmt.Sprintf("unsupport param action: %s", testCfg.action))}}select {case <-ctx.Done():fmt.Println("測試完成, 正在退出 ...")breakcase <-signalCh: // 收到退出信號,正常處理退出testCfg.duration = time.Since(start)fmt.Println("收到終止信號, 正在退出 ...")cancel()break}fmt.Println(testCfg.String())
}