在云原生時代和容器化浪潮中,容器的日志采集是一個看起來不起眼卻又無法忽視的重要議題。對于容器日志采集我們常用的工具有filebeat和fluentd,兩者對比各有優劣,相比基于ruby的fluentd,考慮到可定制性,我們一般默認選擇golang技術棧的filbeat作為主力的日志采集agent。
相比較傳統的日志采集方式,容器化下單節點會運行更多的服務,負載也會有更短的生命周期,而這些更容易對日志采集agent造成壓力,雖然filebeat足夠輕量級和高性能,但如果不了解filebeat的機制,不合理的配置filebeat,實際的生產環境使用中可能也會給我們帶來意想不到的麻煩和難題。
整體架構
日志采集的功能看起來不復雜,主要功能無非就是找到配置的日志文件,然后讀取并處理,發送至相應的后端如elasticsearch,kafka等。
filebeat官網有張示意圖,如下所示:

針對每個日志文件,filebeat都會啟動一個harvester協程,即一個goroutine,在該goroutine中不停的讀取日志文件,直到文件的EOF末尾。一個最簡單的表示采集目錄的input配置大概如下所示:
filebeat.inputs:
- type: log# Paths that should be crawled and fetched. Glob based paths.paths:- /var/log/*.log
不同的harvester goroutine采集到的日志數據都會發送至一個全局的隊列queue中,queue的實現有兩種:基于內存和基于磁盤的隊列,目前基于磁盤的隊列還是處于alpha階段,filebeat默認啟用的是基于內存的緩存隊列。
每當隊列中的數據緩存到一定的大小或者超過了定時的時間(默認1s),會被注冊的client從隊列中消費,發送至配置的后端。目前可以設置的client有kafka、elasticsearch、redis等。
雖然這一切看著挺簡單,但在實際使用中,我們還是需要考慮更多的問題,例如:
- 日志文件是如何被filbebeat發現又是如何被采集的?
- filebeat是如何確保日志采集發送到遠程的存儲中,不丟失一條數據的?
- 如果filebeat掛掉,下次采集如何確保從上次的狀態開始而不會重新采集所有日志?
- filebeat的內存或者cpu占用過多,該如何分析解決?
- filebeat如何支持docker和kubernetes,如何配置容器化下的日志采集?
- 想讓filebeat采集的日志發送至的后端存儲,如果原生不支持,怎樣定制化開發?
這些均需要對filebeat有更深入的理解,下面讓我們跟隨filebeat的源碼一起探究其中的實現機制。
一條日志是如何被采集的
filebeat源碼歸屬于beats項目,而beats項目的設計初衷是為了采集各類的數據,所以beats抽象出了一個libbeat庫,基于libbeat我們可以快速的開發實現一個采集的工具,除了filebeat,還有像metricbeat、packetbeat等官方的項目也是在beats工程中。
如果我們大致看一下代碼就會發現,libbeat已經實現了內存緩存隊列memqueue、幾種output日志發送客戶端,數據的過濾處理processor等通用功能,而filebeat只需要實現日志文件的讀取等和日志相關的邏輯即可。
從代碼的實現角度來看,filebeat大概可以分以下幾個模塊:
- input: 找到配置的日志文件,啟動harvester
- harvester: 讀取文件,發送至spooler - spooler: 緩存日志數據,直到可以發送至publisher
- publisher: 發送日志至后端,同時通知registrar
- registrar: 記錄日志文件被采集的狀態
1. 找到日志文件
對于日志文件的采集和生命周期管理,filebeat抽象出一個Crawler的結構體, 在filebeat啟動后,crawler會根據配置創建,然后遍歷并運行每個input:
for _, inputConfig := range c.inputConfigs {err := c.startInput(pipeline, inputConfig, r.GetStates())}
在每個input運行的邏輯里,首先會根據配置獲取匹配的日志文件,需要注意的是,這里的匹配方式并非正則,而是采用linux glob的規則,和正則還是有一些區別。
matches, err := filepath.Glob(path)
獲取到了所有匹配的日志文件之后,會經過一些復雜的過濾,例如如果配置了exclude_files
則會忽略這類文件,同時還會查詢文件的狀態,如果文件的最近一次修改時間大于ignore_older
的配置,也會不去采集該文件。
2. 讀取日志文件
匹配到最終需要采集的日志文件之后,filebeat會對每個文件啟動harvester goroutine,在該goroutine中不停的讀取日志,并發送給內存緩存隊列memqueue。
在(h *Harvester) Run()
方法中,我們可以看到這么一個無限循環,省略了一些邏輯的代碼如下所示:
for {message, err := h.reader.Next()if err != nil {switch err {case ErrFileTruncate:logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)h.state.Offset = 0filesTruncated.Add(1)case ErrRemoved:logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)case ErrRenamed:logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)case ErrClosed:logp.Info("Reader was closed: %s. Closing.", h.state.Source)case io.EOF:logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)case ErrInactive:logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)default:logp.Err("Read line error: %v; File: %v", err, h.state.Source)}return nil}...if !h.sendEvent(data, forwarder) {return nil}
}
可以看到,reader.Next()方法會不停的讀取日志,如果沒有返回異常,則發送日志數據到緩存隊列中。
返回的異常有幾種類型,除了讀取到EOF外,還會有例如文件一段時間不活躍等情況發生會使harvester goroutine退出,不再采集該文件,并關閉文件句柄。
filebeat為了防止占據過多的采集日志文件的文件句柄,默認的close_inactive
參數為5min,如果日志文件5min內沒有被修改,上面代碼會進入ErrInactive的case,之后該harvester goroutine會被關閉。
這種場景下還需要注意的是,如果某個文件日志采集中被移除了,但是由于此時被filebeat保持著文件句柄,文件占據的磁盤空間會被保留直到harvester goroutine結束。
3. 緩存隊列
在memqueue被初始化時,filebeat會根據配置min_event
是否大于1創建BufferingEventLoop或者DirectEventLoop,一般默認都是BufferingEventLoop,即帶緩沖的隊列。
type bufferingEventLoop struct {broker *Brokerbuf *batchBufferflushList flushListeventCount intminEvents intmaxEvents intflushTimeout time.Duration// active broker API channelsevents chan pushRequestget chan getRequestpubCancel chan producerCancelRequest// ack handlingacks chan int // ackloop -> eventloop : total number of events ACKed by outputsschedACKS chan chanList // eventloop -> ackloop : active list of batches to be ackedpendingACKs chanList // ordered list of active batches to be send to the ackloopackSeq uint // ack batch sequence number to validate ordering// buffer flush timer statetimer *time.TimeridleC <-chan time.Time
}
BufferingEventLoop是一個實現了Broker、帶有各種channel的結構,主要用于將日志發送至consumer消費。 BufferingEventLoop的run方法中,同樣是一個無限循環,這里可以認為是一個日志事件的調度中心。
for {select {case <-broker.done:returncase req := <-l.events: // producer pushing new eventl.handleInsert(&req)case req := <-l.get: // consumer asking for next batchl.handleConsumer(&req)case count := <-l.acks:l.handleACK(count)case <-l.idleC:l.idleC = nill.timer.Stop()if l.buf.length() > 0 {l.flushBuffer()}}}
上文中harvester goroutine每次讀取到日志數據之后,最終會被發送至bufferingEventLoop中的events chan pushRequest
channel,然后觸發上面req := <-l.events
的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日志數據的隊列,如果buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了case <-l.idleC
,均會調用flushBuffer()方法。
flushBuffer()又會觸發req := <-l.get
的case,然后運行handleConsumer方法,該方法中最重要的是這一句代碼:
req.resp <- getResponse{ackChan, events}
這里獲取到了consumer消費者的response channel,然后發送數據給這個channel。真正到這,才會觸發consumer對memqueue的消費。所以,其實memqueue并非一直不停的在被consumer消費,而是在memqueue通知consumer的時候才被消費,我們可以理解為一種脈沖式的發送。
4. 消費隊列
實際上,早在filebeat初始化的時候,就已經創建了一個eventConsumer并在loop無限循環方法里試圖從Broker中獲取日志數據。
for {if !paused && c.out != nil && consumer != nil && batch == nil {out = c.out.workQueuequeueBatch, err := consumer.Get(c.out.batchSize)...batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)}...select {case <-c.done:returncase sig := <-c.sig:handleSignal(sig)case out <- batch:batch = nil}}
上面consumer.Get就是消費者consumer從Broker中獲取日志數據,然后發送至out的channel中被output client發送,我們看一下Get方法里的核心代碼:
select {case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:case <-c.done:return nil, io.EOF}// if request has been send, we do have to wait for a responseresp := <-c.respreturn &batch{consumer: c,events: resp.buf,ack: resp.ack,state: batchActive,}, nil
getRequest的結構如下:
type getRequest struct {sz int // request sz events from the brokerresp chan getResponse // channel to send response to
}
getResponse的結構:
type getResponse struct {ack *ackChanbuf []publisher.Event
}
getResponse里包含了日志的數據,而getRequest包含了一個發送至消費者的channel。
在上文bufferingEventLoop緩沖隊列的handleConsumer方法里接收到的參數為getRequest,里面包含了consumer請求的getResponse channel。
如果handleConsumer不發送數據,consumer.Get方法會一直阻塞在select中,直到flushBuffer,consumer的getResponse channel才會接收到日志數據。
5. 發送日志
在創建beats時,會創建一個clientWorker,clientWorker的run方法中,會不停的從consumer發送的channel里讀取日志數據,然后調用client.Publish批量發送日志。
func (w *clientWorker) run() {for !w.closed.Load() {for batch := range w.qu {if err := w.client.Publish(batch); err != nil {return}}}
}
libbeats庫中包含了kafka、elasticsearch、logstash等幾種client,它們均實現了client接口:
type Client interface {Close() errorPublish(publisher.Batch) errorString() string
}
當然最重要的是實現Publish接口,然后將日志發送出去。
實際上,filebeat中日志數據在各種channel里流轉的設計還是比較復雜和繁瑣的,筆者也是研究了好久、畫了很長的架構圖才理清楚其中的邏輯。 這里抽出了一個簡化后的圖以供參考:

如何保證at least once
filebeat維護了一個registry文件在本地的磁盤,該registry文件維護了所有已經采集的日志文件的狀態。 實際上,每當日志數據發送至后端成功后,會返回ack事件。filebeat啟動了一個獨立的registry協程負責監聽該事件,接收到ack事件后會將日志文件的State狀態更新至registry文件中,State中的Offset表示讀取到的文件偏移量,所以filebeat會保證Offset記錄之前的日志數據肯定被后端的日志存儲接收到。
State結構如下所示:
type State struct {Id string `json:"-"` // local unique id to make comparison more efficientFinished bool `json:"-"` // harvester stateFileinfo os.FileInfo `json:"-"` // the file infoSource string `json:"source"`Offset int64 `json:"offset"`Timestamp time.Time `json:"timestamp"`TTL time.Duration `json:"ttl"`Type string `json:"type"`Meta map[string]string `json:"meta"`FileStateOS file.StateOS
}
記錄在registry文件中的數據大致如下所示:
[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]
由于文件可能會被改名或移動,filebeat會根據inode和設備號來標志每個日志文件。
如果filebeat異常重啟,每次采集harvester啟動的時候都會讀取registry文件,從上次記錄的狀態繼續采集,確保不會從頭開始重復發送所有的日志文件。
當然,如果日志發送過程中,還沒來得及返回ack,filebeat就掛掉,registry文件肯定不會更新至最新的狀態,那么下次采集的時候,這部分的日志就會重復發送,所以這意味著filebeat只能保證at least once,無法保證不重復發送。
還有一個比較異常的情況是,linux下如果老文件被移除,新文件馬上創建,很有可能它們有相同的inode,而由于filebeat根據inode來標志文件記錄采集的偏移,會導致registry里記錄的其實是被移除的文件State狀態,這樣新的文件采集卻從老的文件Offset開始,從而會遺漏日志數據。
為了盡量避免inode被復用的情況,同時防止registry文件隨著時間增長越來越大,建議使用clean_inactive和clean_remove配置將長時間未更新或者被刪除的文件State從registry中移除。
同時我們可以發現在harvester讀取日志中,會更新registry的狀態處理一些異常場景。例如,如果一個日志文件被清空,filebeat會在下一次Reader.Next方法中返回ErrFileTruncate異常,將inode標志文件的Offset置為0,結束這次harvester,重新啟動新的harvester,雖然文件不變,但是registry中的Offset為0,采集會從頭開始。
特別注意的是,如果使用容器部署filebeat,需要將registry文件掛載到宿主機上,否則容器重啟后registry文件丟失,會使filebeat從頭開始重復采集日志文件。
filebeat自動reload更新
目前filebeat支持reload input配置,module配置,但reload的機制只有定時更新。
在配置中打開reload.enable之后,還可以配置reload.period表示自動reload配置的時間間隔。
filebeat在啟動時,會創建一個專門用于reload的協程。對于每個正在運行的harvester,filebeat會將其加入一個全局的Runner列表,每次到了定時的間隔后,會觸發一次配置文件的diff判斷,如果是需要停止的加入stopRunner列表,然后逐個關閉,新的則加入startRunner列表,啟動新的Runner。
filebeat對kubernetes的支持
filebeat官方文檔提供了在kubernetes下基于daemonset的部署方式,最主要的一個配置如下所示:
- type: dockercontainers.ids:- "*"processors:- add_kubernetes_metadata:in_cluster: true
即設置輸入input為docker類型。由于所有的容器的標準輸出日志默認都在節點的/var/lib/docker/containers/<containerId>/*-json.log
路徑,所以本質上采集的是這類日志文件。
和傳統的部署方式有所區別的是,如果服務部署在kubernetes上,我們查看和檢索日志的維度不能僅僅局限于節點和服務,還需要有podName,containerName等,所以每條日志我們都需要打標增加kubernetes的元信息才發送至后端。
filebeat會在配置中增加了add_kubernetes_metadata的processor的情況下,啟動監聽kubernetes的watch服務,監聽所有kubernetes pod的變更,然后將歸屬本節點的pod最新的事件同步至本地的緩存中。
節點上一旦發生容器的銷毀創建,/var/lib/docker/containers/下會有目錄的變動,filebeat根據路徑提取出containerId,再根據containerId從本地的緩存中找到pod信息,從而可以獲取到podName、label等數據,并加到日志的元信息fields中。
filebeat還有一個beta版的功能autodiscover,autodiscover的目的是把分散到不同節點上的filebeat配置文件集中管理。目前也支持kubernetes作為provider,本質上還是監聽kubernetes事件然后采集docker的標準輸出文件。
大致架構如下所示:

但是在實際生產環境使用中,僅采集容器的標準輸出日志還是遠遠不夠,我們往往還需要采集容器掛載出來的自定義日志目錄,還需要控制每個服務的日志采集方式以及更多的定制化功能。
在輕舟容器云上,我們自研了一個監聽kubernetes事件自動生成filebeat配置的agent,通過CRD的方式,支持自定義容器內部日志目錄、支持自定義fields、支持多行讀取等功能。同時可在kubernetes上統一管理各種日志配置,而且無需用戶感知pod的創建銷毀和遷移,自動完成各種場景下的日志配置生成和更新。
性能分析與調優
雖然beats系列主打輕量級,雖然用golang寫的filebeat的內存占用確實比較基于jvm的logstash等好太多,但是事實告訴我們其實沒那么簡單。
正常啟動filebeat,一般確實只會占用3、40MB內存,但是在輕舟容器云上偶發性的我們也會發現某些節點上的filebeat容器內存占用超過配置的pod limit限制(一般設置為200MB),并且不停的觸發的OOM。
究其原因,一般容器化環境中,特別是裸機上運行的容器個數可能會比較多,導致創建大量的harvester去采集日志。如果沒有很好的配置filebeat,會有較大概率導致內存急劇上升。
當然,filebeat內存占據較大的部分還是memqueue,所有采集到的日志都會先發送至memqueue聚集,再通過output發送出去。每條日志的數據在filebeat中都被組裝為event結構,filebeat默認配置的memqueue緩存的event個數為4096,可通過queue.mem.events
設置。默認最大的一條日志的event大小限制為10MB,可通過max_bytes
設置。4096 * 10MB = 40GB
,可以想象,極端場景下,filebeat至少占據40GB的內存。特別是配置了multiline多行模式的情況下,如果multiline配置有誤,單個event誤采集為上千條日志的數據,很可能導致memqueue占據了大量內存,致使內存爆炸。
所以,合理的配置日志文件的匹配規則,限制單行日志大小,根據實際情況配置memqueue緩存的個數,才能在實際使用中規避filebeat的內存占用過大的問題。
如何對filebeat進行擴展開發
一般情況下filebeat可滿足大部分的日志采集需求,但是仍然避免不了一些特殊的場景需要我們對filebeat進行定制化開發,當然filebeat本身的設計也提供了良好的擴展性。
beats目前只提供了像elasticsearch、kafka、logstash等幾類output客戶端,如果我們想要filebeat直接發送至其他后端,需要定制化開發自己的output。同樣,如果需要對日志做過濾處理或者增加元信息,也可以自制processor插件。
無論是增加output還是寫個processor,filebeat提供的大體思路基本相同。一般來講有3種方式:
- 直接fork filebeat,在現有的源碼上開發。output或者processor都提供了類似Run、Stop等的接口,只需要實現該類接口,然后在init方法中注冊相應的插件初始化方法即可。當然,由于golang中init方法是在import包時才被調用,所以需要在初始化filebeat的代碼中手動import。
- 復制一份filebeat的main.go,import我們自研的插件庫,然后重新編譯。本質上和方式1區別不大。
- filebeat還提供了基于golang plugin的插件機制,需要把自研的插件編譯成.so共享鏈接庫,然后在filebeat啟動參數中通過-plugin指定庫所在路徑。不過實際上一方面golang plugin還不夠成熟穩定,一方面自研的插件依然需要依賴相同版本的libbeat庫,而且還需要相同的golang版本編譯,坑可能更多,不太推薦。