源碼地址:https://github.com/k8scat/docker-log-driver-tencent-cls
在現代云原生架構中,容器化應用已經成為主流部署方式。隨著容器數量的快速增長,如何高效地收集、存儲和分析容器日志成為了一個關鍵挑戰。傳統的日志收集方式往往存在以下問題:
- 日志分散在各個容器中,難以統一管理
- 缺乏結構化的日志格式,不利于后續分析
- 日志存儲成本高,且難以進行實時查詢
- 缺乏統一的日志檢索和監控機制
為了解決這些問題,我們開發了一個專門的 Docker 日志驅動,將容器日志直接發送到騰訊云的 CLS(Cloud Log Service)日志服務。這個驅動實現了與 Docker 日志系統的深度集成,提供了高性能、可靠的日志傳輸能力。
技術架構設計
整體架構
該日志驅動采用了模塊化的設計架構,主要包含以下幾個核心組件:
- Driver 模塊:負責管理日志流和容器生命周期
- Logger 模塊:處理日志格式化和發送邏輯
- Client 模塊:封裝騰訊云 CLS SDK 的調用
- Server 模塊:提供 Docker 插件接口服務
- 配置管理模塊:處理各種配置參數的解析和驗證
這種分層架構確保了代碼的可維護性和可擴展性,每個模塊都有明確的職責邊界。
核心數據結構
項目定義了多個關鍵的數據結構來支持日志驅動的功能:
type Driver struct {streams map[string]*logStreamcontainerStreams map[string]*logStreammu sync.RWMutexfs fileSystemnewTencentCLSLogger newTencentCLSLoggerFuncprocessLogs func(stream *logStream)logger *zap.Logger
}type TencentCLSLogger struct {client clientformatter *messageFormattercfg *loggerConfigbuffer chan stringmu sync.MutexpartialLogsBuffer *partialLogBufferwg sync.WaitGroupclosed chan struct{}logger *zap.Logger
}
這些數據結構的設計充分考慮了并發安全性和資源管理,確保了在高并發場景下的穩定運行。
核心功能實現
日志流管理
日志驅動的核心功能是管理容器的日志流。每個容器啟動時,驅動會創建一個獨立的日志流來處理該容器的所有日志輸出:
func (d *Driver) StartLogging(streamPath string, containerDetails *ContainerDetails) (stream *logStream, err error) {d.logger.Info("starting logging", zap.String("stream_path", streamPath), zap.Any("container_details", containerDetails))d.mu.RLock()if _, ok := d.streams[streamPath]; ok {d.mu.RUnlock()return nil, errors.New("already logging")}d.mu.RUnlock()name := "container:" + containerDetails.ContainerNamestream = &logStream{streamPath: streamPath,containerDetails: containerDetails,logger: d.logger.Named(name),fs: d.fs,stop: make(chan struct{}),}// 初始化日志流if err := d.initializeStream(stream); err != nil {return nil, err}// 啟動日志處理協程go d.processLogs(stream)return stream, nil
}
這種設計確保了每個容器的日志都能被獨立處理,避免了不同容器之間的日志混淆。
日志處理流程
日志處理采用了異步非阻塞的設計模式,確保不會影響容器的正常運行:
func (d *Driver) defaultProcessLogs(stream *logStream, processedNotifier chan<- struct{}) {defer func() {if err := stream.Close(); err != nil {d.logger.Error("failed to close stream", zap.Error(err))}}()logs := NewLogs(stream)for logs.Next(