日志是系統運行的詳細記錄,包含各種事件發生的主體、時間、位置、內容等關鍵信息。出于運維可觀測、網絡安全監控及業務分析等多重需求,企業通常需要將分散的日志采集起來,進行集中存儲、查詢和分析,以進一步從日志數據里挖掘出有價值的內容。
針對此場景,Apache Doris 提供了相應解決方案,針對日志場景的特點,增加了倒排索引和極速全文檢索能力,極致優化寫入性能和存儲空間,使得用戶可以基于 Apache Doris 構建開放、高性能、低成本、統一的日志存儲與分析平臺。
本文將圍繞這一解決方案,介紹以下內容:
- 整體架構:說明基于 Apache Doris 構建的日志存儲與分析平臺的核心組成部分和基礎架構。
- 特點與優勢:說明基于 Apache Doris 構建的日志存儲與分析平臺的特點和優勢。
- 操作指南:說明如何基于 Apache Doris 構建日志存儲分析平臺。
整體架構?
基于 Apache Doris 構建的日志存儲與分析平臺的架構如下圖:
此架構主要由 3 大部分組成:
- 日志采集和預處理:多種日志采集工具可以通過 HTTP APIs 將日志數據寫入 Apache Doris。
- 日志存儲和分析引擎:Apache Doris 提供高性能、低成本的統一日志存儲,通過 SQL 接口提供豐富的檢索分析能力。
- 日志分析和告警界面:多種日志檢索分析通工具通過標準 SQL 接口查詢 Apache Doris,為用戶提供簡單易用的界面。
特點與優勢?
基于 Apache Doris 構建的日志存儲與分析平臺的特點和優勢如下:
- 高吞吐、低延遲日志寫入:支持每天百 TB 級、GB/s 級日志數據持續穩定寫入,同時保持延遲 1s 以內。
- 海量日志數據低成本存儲:支持 PB 級海量存儲,相對于 Elasticsearch 存儲成本節省 60% 到 80%,支持冷數據存儲到 S3/HDFS,存儲成本再降 50%。
- 高性能日志全文檢索分析:支持倒排索引和全文檢索,日志場景常見查詢(關鍵詞檢索明細、趨勢分析等)秒級響應。
- 開放、易用的上下游生態:上游通過 Stream Load 通用 HTTP APIs 對接常見的日志采集系統和數據源 Logstash、Filebeat、Fluentbit、Kafka 等,下游通過標準 MySQL 協議和語法對接各種可視化分析 UI,比如可觀測性 Grafana、BI 分析 Superset、類 Kibana 的日志檢索 Doris WebUI。
高性能、低成本?
經過 Benchmark 測試及生產驗證,基于 Apache Doris 構建的日志存儲與分析平臺,性價比相對于 Elasticsearch 具有 5~10 倍的提升。Apache Doris 的性能優勢,主要得益于全球領先的高性能存儲和查詢引擎,以及下面一些針對日志場景的專門優化:
- 寫入吞吐提升:Elasticsearch 寫入的性能瓶頸在于解析數據和構建倒排索引的 CPU 消耗。相比之下,Apache Doris 進行了兩方面的寫入優化:一方面利用 SIMD 等 CPU 向量化指令提升了 JSON 數據解析速度和索引構建性能;另一方面針對日志場景簡化了倒排索引結構,去掉日志場景不需要的正排等數據結構,有效降低了索引構建的復雜度。同樣的資源,Apache Doris 的寫入性能是 Elasticsearch 的 3~5 倍。
- 存儲成本降低:Elasticsearch 存儲瓶頸在于正排、倒排、Docvalue 列存多份存儲和通用壓縮算法壓縮率較低。相比之下,Apache Doris 在存儲上進行了以下優化:去掉正排,縮減了 30% 的索引數據量;采用列式存儲和 Zstandard 壓縮算法,壓縮比可達到 5~10 倍,遠高于 Elasticsearch 的 1.5 倍;日志數據中冷數據訪問頻率很低,Apache Doris 冷熱分層功能可以將超過定義時間段的日志自動存儲到更低的對象存儲中,冷數據的存儲成本可降低 70% 以上。同樣的原始數據,Doris 的存儲成本只需要 Elasticsearch 的 20% 左右。
- 查詢性能提升:Apache Doris 將全文檢索的流程簡化,跳過了相關性打分等日志場景不需要的算法,加速基礎的檢索性能。同時針對日志場景常見的查詢,比如查詢包含某個關鍵字的最新 100 條日志,在查詢規劃和執行上做專門的 TopN 動態剪枝等優化。
分析能力強?
Apache Doris 支持標準 SQL、兼容 MySQL 協議和語法,因此基于 Apache Doris 構建的日志系統能夠使用 SQL 進行日志分析,這使得日志系統具備以下優勢:
- 簡單易用:工程師和數據分析師對于 SQL 非常熟悉,經驗可以復用,不需要學習新的技術棧即可快速上手。
- 生態豐富:MySQL 生態是數據庫領域使用最廣泛的語言,因此可以與 MySQL 生態的集成和應用無縫銜接。Doris 可以利用 MySQL 命令行與各種 GUI 工具、BI 工具等大數據生態結合,實現更復雜及多樣化的數據處理分析需求。
- 分析能力強:SQL 語言已經成為數據庫和大數據分析的事實標準,它具有強大的表達能力和功能,支持檢索、聚合、多表 JOIN、子查詢、UDF、邏輯視圖、物化視圖等多種數據分析能力。
Flexible Schema?
下面是一個典型的 JSON 格式半結構化日志樣例。頂層字段是一些比較固定的字段,比如日志時間戳(timestamp
),日志來源(source
),日志所在機器(node
),打日志的模塊(component
),日志級別(level
),客戶端請求標識(clientRequestId
),日志內容(message
),日志擴展屬性(properties
),基本上每條日志都會有。而擴展屬性?properties
?的內部嵌套字段?properties.size
、properties.format
?等是比較動態的,每條日志的字段可能不一樣。
{ "timestamp": "2014-03-08T00:50:03.8432810Z","source": "ADOPTIONCUSTOMERS81","node": "Engine000000000405","level": "Information","component": "DOWNLOADER","clientRequestId": "671db15d-abad-94f6-dd93-b3a2e6000672","message": "Downloading file path: benchmark/2014/ADOPTIONCUSTOMERS81_94_0.parquet.gz","properties": {"size": 1495636750,"format": "parquet","rowCount": 855138,"downloadDuration": "00:01:58.3520561"}
}
Apache Doris 對 Flexible Schema 的日志數據提供了幾個方面的支持:
- 對于頂層字段的少量變化,可以通過 Light Schema Change 發起 ADD / DROP COLUMN 增加 / 刪除列,ADD / DROP INDEX 增加 / 刪除索引,能夠在秒級完成 Schema 變更。用戶在日志平臺規劃時只需考慮當前需要哪些字段創建索引。
- 對于類似?
properties
?的擴展字段,提供了原生半結構化數據類型?VARIANT
,可以寫入任何 JSON 數據,自動識別 JSON 中的字段名和類型,并自動拆分頻繁出現的字段采用列式存儲,以便于后續的分析,還可以對?VARIANT
?創建倒排索引,加快內部字段的查詢和檢索。
相對于 Elasticsearch 的 Dynamic Mapping,Apache Doris 的 Flexible Schema 有以下優勢:
- 允許一個字段有多種類型,
VARIANT
?自動對字段類型做沖突處理和類型提升,更好地適應日志數據的迭代變化。 VARIANT
?自動將不頻繁出現的字段合并成一個列存儲,可避免字段、元數據、列過多導致性能問題。- 不僅可以動態加列,還可以動態刪列、動態增加索引、動態刪索引,無需像 Elasticsearch 在一開始對所有字段建索引,減少不必要的成本。
操作指南?
第 1 步:評估資源?
在部署集群之前,首先應評估所需服務器硬件資源,包括以下幾個關鍵步驟:
- 評估寫入資源:計算公式如下:
平均寫入吞吐 = 日增數據量 / 86400 s
峰值寫入吞吐 = 平均寫入吞吐 * 寫入吞吐峰值 / 均值比
峰值寫入所需 CPU 核數 = 峰值寫入吞吐 / 單核寫入吞吐
-
評估存儲資源:計算公式為?
所需存儲空間 = 日增數據量 / 壓縮率 * 副本數 * 數據存儲周期
-
評估查詢資源:查詢的資源消耗隨查詢量和復雜度而異,建議初始預留 50% 的 CPU 資源用于查詢,再根據實際測試情況進行調整。
-
匯總整合資源:由第 1 步和第 3 步估算出所需 CPU 核數后,除以單機 CPU 核數,估算出 BE 服務器數量,再根據 BE 服務器數量和第 2 步的結果,估算出每臺 BE 服務器所需存儲空間,然后分攤到 4~12 塊數據盤,計算出單盤存儲容量。
以每天新增 100 TB 數據量(壓縮前)、5 倍壓縮率、1 副本、熱數據存儲 3 天、冷數據存儲 30 天、寫入吞吐峰值 / 均值比 200%、單核寫入吞吐 10 MB/s、查詢預留 50% CPU 資源為例,可估算出:
- FE:3 臺服務器,每臺配置 16 核 CPU、64 GB 內存、1 塊 100 GB SSD 盤
- BE:15 臺服務器,每臺配置 32 核 CPU、256 GB 內存、10 塊 600 GB SSD 盤
- S3 對象存儲空間:即為預估冷數據存儲空間,600 TB
該例子中,各關鍵指標的值及具體計算方法可見下表:
關鍵指標(單位) | 值 | 說明 |
---|---|---|
日增數據量(TB) | 100 | 根據實際需求填寫 |
壓縮率 | 5 | 一般為 3~10 倍(含索引),根據實際需求填寫 |
副本數 | 1 | 根據實際需求填寫,默認 1 副本,可選值:1,2,3 |
熱數據存儲周期(天) | 3 | 根據實際需求填寫 |
冷數據存儲周期(天) | 30 | 根據實際需求填寫 |
總存儲周期(天) | 33 | 算法:熱數據存儲周期 + 冷數據存儲周期 |
預估熱數據存儲空間(TB) | 60 | 算法:日增數據量 / 壓縮率 * 副本數 * 熱數據存儲周期 |
預估冷數據存儲空間(TB) | 600 | 算法:日增數據量 / 壓縮率 * 副本數 * 冷數據存儲周期 |
寫入吞吐峰值 / 均值比 | 200% | 根據實際需求填寫,默認 200% |
單機 CPU 核數 | 32 | 根據實際需求填寫,默認 32 核 |
平均寫入吞吐(MB/s) | 1214 | 算法:日增數據量 / 86400 s |
峰值寫入吞吐(MB/s) | 2427 | 算法:平均寫入吞吐 * 寫入吞吐峰值 / 均值比 |
峰值寫入所需 CPU 核數 | 242.7 | 算法:峰值寫入吞吐 / 單核寫入吞吐 |
查詢預留 CPU 百分比 | 50% | 根據實際需求填寫,默認 50% |
預估 BE 服務器數 | 15.2 | 算法:峰值寫入所需 CPU 核數 / 單機 CPU 核數 /(1 - 查詢預留 CPU 百分比) |
預估 BE 服務器數取整 | 15 | 算法:MAX (副本數,預估 BE 服務器數取整) |
預估每臺 BE 服務器存儲空間(TB) | 5.7 | 算法:預估熱數據存儲空間 / 預估 BE 服務器數 /(1 - 30%) ,其中,30% 是存儲空間預留值。建議每臺 BE 服務器掛載 4~12 塊數據盤,以提高 I/O 能力。 |
第 2 步:部署集群?
完成資源評估后,可以開始部署 Apache Doris 集群,推薦在物理機及虛擬機環境中進行部署。手動部署集群,可參考?手動部署。
第 3 步:優化 FE 和 BE 配置?
完成集群部署后,需分別優化 FE 和 BE 配置參數,以更加契合日志存儲與分析的場景。
優化 FE 配置
在?fe/conf/fe.conf
?目錄下找到 FE 的相關配置項,并按照以下表格,調整 FE 配置。
需調整參數 | 說明 |
---|---|
max_running_txn_num_per_db = 10000 | 高并發導入運行事務數較多,需調高參數。 |
streaming_label_keep_max_second = 3600 ?label_keep_max_second = 7200 | 高頻導入事務標簽內存占用多,保留時間調短。 |
enable_round_robin_create_tablet = true | 創建 Tablet 時,采用 Round Robin 策略,盡量均勻。 |
tablet_rebalancer_type = partition | 均衡 Tablet 時,采用每個分區內盡量均勻的策略。 |
autobucket_min_buckets = 10 | 將自動分桶的最小分桶數從 1 調大到 10,避免日志量增加時分桶不夠。 |
max_backend_heartbeat_failure_tolerance_count = 10 | 日志場景下 BE 服務器壓力較大,可能短時間心跳超時,因此將容忍次數從 1 調大到 10。 |
更多關于 FE 配置項的信息,可參考?FE 配置項。
優化 BE 配置
在?be/conf/be.conf
?目錄下找到 BE 的相關配置項,并按照以下表格,調整 BE 配置。
模塊 | 需調整參數 | 說明 |
---|---|---|
存儲 | storage_root_path = /path/to/dir1;/path/to/dir2;...;/path/to/dir12 | 配置熱數據在磁盤目錄上的存儲路徑。 |
- | enable_file_cache = true | 開啟文件緩存。 |
- | file_cache_path = [{"path": "/mnt/datadisk0/file_cache", "total_size":53687091200, "query_limit": "10737418240"},{"path": "/mnt/datadisk1/file_cache", "total_size":53687091200,"query_limit": "10737418240"}] | 配置冷數據的緩存路徑和相關設置,具體配置說明如下:path :緩存路徑total_size :該緩存路徑的總大小,單位為字節,53687091200 字節等于 50 GBquery_limit :單次查詢可以從緩存路徑中查詢的最大數據量,單位為字節,10737418240 字節等于 10 GB |
寫入 | write_buffer_size = 1073741824 | 增加寫入緩沖區(buffer)的文件大小,減少小文件和隨機 I/O 操作,提升性能。 |
- | max_tablet_version_num = 20000 | 配合建表的 time_series compaction 策略,允許更多版本暫時未合并。 |
Compaction | max_cumu_compaction_threads = 8 | 設置為 CPU 核數 / 4,意味著 CPU 資源的 1/4 用于寫入,1/4 用于后臺 Compaction,2/1 留給查詢和其他操作。 |
- | inverted_index_compaction_enable = true | 開啟索引合并(index compaction),減少 Compaction 時的 CPU 消耗。 |
- | enable_segcompaction = false ?enable_ordered_data_compaction = false | 關閉日志場景不需要的兩個 Compaction 功能。 |
- | enable_compaction_priority_scheduling = false | 低優先級 compaction 在一塊盤上限制 2 個任務,會影響 compaction 速度。 |
- | total_permits_for_compaction_score = 200000 | 該參數用來控制內存,time series 策略下本身可以控制內存。 |
緩存 | disable_storage_page_cache = true ?inverted_index_searcher_cache_limit = 30% | 因為日志數據量較大,緩存(cache)作用有限,因此關閉數據緩存,調換為索引緩存(index cache)的方式。 |
- | inverted_index_cache_stale_sweep_time_sec = 3600 ?index_cache_entry_stay_time_after_lookup_s = 3600 | 讓索引緩存在內存中盡量保留 1 小時。 |
- | enable_inverted_index_cache_on_cooldown = true enable_write_index_searcher_cache = false | 開啟索引上傳冷數據存儲時自動緩存的功能。 |
- | tablet_schema_cache_recycle_interval = 3600 ?segment_cache_capacity = 20000 | 減少其他緩存對內存的占用。 |
- | inverted_index_ram_dir_enable = true | 減少寫入時索引臨時文件帶來的 IO 開銷。 |
線程 | pipeline_executor_size = 24 ?doris_scanner_thread_pool_thread_num = 48 | 32 核 CPU 的計算線程和 I/O 線程配置,根據核數等比擴縮。 |
- | scan_thread_nice_value = 5 | 降低查詢 I/O 線程的優先級,保證寫入性能和時效性。 |
其他 | string_type_length_soft_limit_bytes = 10485760 | 將 String 類型數據的長度限制調高至 10 MB。 |
- | trash_file_expire_time_sec = 300 ?path_gc_check_interval_second = 900 ?path_scan_interval_second = 900 | 調快垃圾文件的回收時間。 |
更多關于 BE 配置項的信息,可參考?BE 配置項。
第 4 步:建表?
由于日志數據的寫入和查詢都具備明顯的特征,因此,在建表時按照本節說明進行針對性配置,以提升性能表現。
配置分區分桶參數
分區時,按照以下說明配置:
- 使用時間字段上的 (./table-design/data-partitioning/manual-partitioning.md#range-分區) (
PARTITION BY RANGE(
ts)
),并開啟?動態分區?("dynamic_partition.enable" = "true"
),按天自動管理分區。 - 使用 Datetime 類型的時間字段作為 Key (
DUPLICATE KEY(ts)
),在查詢最新 N 條日志時有數倍加速。
分桶時,按照以下說明配置:
- 分桶數量大致為集群磁盤總數的 3 倍,每個桶的數據量壓縮后 5GB 左右。
- 使用 Random 策略 (
DISTRIBUTED BY RANDOM BUCKETS 60
),配合寫入時的 Single Tablet 導入,可以提升批量(Batch)寫入的效率。
更多關于分區分桶的信息,可參考?數據劃分。
配置壓縮參數
- 使用 zstd 壓縮算法 (
"compression" = "zstd"
), 提高數據壓縮率。
配置 Compaction 參數
按照以下說明配置 Compaction 參數:
- 使用 time_series 策略 (
"compaction_policy" = "time_series"
),以減輕寫放大效應,對于高吞吐日志寫入的資源寫入很重要。
建立和配置索引參數
按照以下說明操作:
- 對經常查詢的字段建立索引 (
USING INVERTED
)。 - 對需要全文檢索的字段,將分詞器(parser)參數賦值為 unicode,一般能滿足大部分需求。如有支持短語查詢的需求,將 support_phrase 參數賦值為 true;如不需要,則設置為 false,以降低存儲空間。
配置存儲策略
按照以下說明操作:
- 對于熱存儲數據,如果使用云盤,可配置 1 副本;如果使用物理盤,則至少配置 2 副本 (
"replication_num" = "2"
)。 - 配置?
log_s3
?的存儲位置 (CREATE RESOURCE "log_s3"
),并設置?log_policy_3day
?冷熱數據分層策略 (CREATE STORAGE POLICY log_policy_3day
),即在超過 3 天后將數據冷卻至?log_s3
?指定的存儲位置。可參考以下代碼:
CREATE DATABASE log_db;
USE log_db;CREATE RESOURCE "log_s3"
PROPERTIES
("type" = "s3","s3.endpoint" = "your_endpoint_url","s3.region" = "your_region","s3.bucket" = "your_bucket","s3.root.path" = "your_path","s3.access_key" = "your_ak","s3.secret_key" = "your_sk"
);CREATE STORAGE POLICY log_policy_3day
PROPERTIES("storage_resource" = "log_s3","cooldown_ttl" = "259200"
);CREATE TABLE log_table
(`ts` DATETIME,`host` TEXT,`path` TEXT,`message` TEXT,INDEX idx_host (`host`) USING INVERTED,INDEX idx_path (`path`) USING INVERTED,INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS 60
PROPERTIES ("compression" = "zstd","compaction_policy" = "time_series","dynamic_partition.enable" = "true","dynamic_partition.create_history_partition" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-30","dynamic_partition.end" = "1","dynamic_partition.prefix" = "p","dynamic_partition.buckets" = "60","dynamic_partition.replication_num" = "2", -- 存算分離不需要"replication_num" = "2", -- 存算分離不需要"storage_policy" = "log_policy_3day" -- 存算分離不需要
);
第 5 步:采集日志?
完成建表后,可進行日志采集。
Apache Doris 提供開放、通用的 Stream HTTP APIs,通過這些 APIs,你可與常用的日志采集器打通,包括 Logstash、Filebeat、Kafka 等,從而開展日志采集工作。本節介紹了如何使用 Stream HTTP APIs 對接日志采集器。
對接 Logstash
按照以下步驟操作:
- 下載并安裝 Logstash Doris Output 插件。你可選擇以下兩種方式之一:
-
直接下載:點此下載。
-
從源碼編譯,并運行下方命令安裝:
./bin/logstash-plugin install logstash-output-doris-1.0.0.gem
- 配置 Logstash。需配置以下參數:
logstash.yml
:配置 Logstash 批處理日志的條數和時間,用于提升數據寫入性能。
pipeline.batch.size: 1000000
pipeline.batch.delay: 10000
logstash_demo.conf
:配置所采集日志的具體輸入路徑和輸出到 Apache Doris 的設置。
input { file { path => "/path/to/your/log" }
} output { doris { http_hosts => [ "<http://fehost1:http_port>", "<http://fehost2:http_port>", "<http://fehost3:http_port">] user => "your_username" password => "your_password" db => "your_db" table => "your_table" # doris stream load http headers headers => { "format" => "json" "read_json_by_line" => "true" "load_to_single_tablet" => "true" } # field mapping: doris fileld name => logstash field name # %{} to get a logstash field, [] for nested field such as [host][name] for host.name mapping => { "ts" => "%{@timestamp}" "host" => "%{[host][name]}" "path" => "%{[log][file][path]}" "message" => "%{message}" } log_request => true log_speed_interval => 10 }
}
- 按照下方命令運行 Logstash,采集日志并輸出至 Apache Doris。
./bin/logstash -f logstash_demo.conf
更多關于 Logstash 配置和使用的說明,可參考?Logstash Doris Output Plugin。
對接 Filebeat
按照以下步驟操作:
- 獲取支持輸出至 Apache Doris 的 Filebeat 二進制文件。可?點此下載?或者從 Apache Doris 源碼編譯。
- 配置 Filebeat。需配置以下參數:
-
filebeat_demo.yml
:配置所采集日志的具體輸入路徑和輸出到 Apache Doris 的設置。# input filebeat.inputs: - type: logenabled: truepaths:- /path/to/your/log# multiline 可以將跨行的日志(比如 Java stacktrace)拼接起來multiline:type: pattern# 效果:以 yyyy-mm-dd HH:MM:SS 開頭的行認為是一條新的日志,其他都拼接到上一條日志pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}'negate: truematch: afterskip_newline: trueprocessors: # 用 js script 插件將日志中的 \t 替換成空格,避免 JSON 解析報錯 - script:lang: javascriptsource: >function process(event) {var msg = event.Get("message");msg = msg.replace(/\t/g, " ");event.Put("message", msg);} # 用 dissect 插件做簡單的日志解析 - dissect:# 2024-06-08 18:26:25,481 INFO (report-thread|199) [ReportHandler.cpuReport():617] begin to handletokenizer: "%{day} %{time} %{log_level} (%{thread}) [%{position}] %{content}"target_prefix: ""ignore_failure: trueoverwrite_keys: true# queue and batch queue.mem:events: 1000000flush.min_events: 100000flush.timeout: 10s# output output.doris:fenodes: [ "http://fehost1:http_port", "http://fehost2:http_port", "http://fehost3:http_port" ]user: "your_username"password: "your_password"database: "your_db"table: "your_table"# output string format## %{[agent][hostname]} %{[log][file][path]} 是filebeat自帶的metadata## 常用的 filebeat metadata 還是有采集時間戳 %{[@timestamp]}## %{[day]} %{[time]} 是上面 dissect 解析得到字段codec_format_string: '{"ts": "%{[day]} %{[time]}", "host": "%{[agent][hostname]}", "path": "%{[log][file][path]}", "message": "%{[message]}"}'headers:format: "json"read_json_by_line: "true"load_to_single_tablet: "true"
- 按照下方命令運行 Filebeat,采集日志并輸出至 Apache Doris。
chmod +x filebeat-doris-1.0.0
./filebeat-doris-1.0.0 -c filebeat_demo.yml
更多關于 Filebeat 配置和使用的說明,可參考?Beats Doris Output Plugin。
對接 Kafka
將 JSON 格式的日志寫入 Kafka 的消息隊列,創建 Kafka Routine Load,即可讓 Apache Doris 從 Kafka 主動拉取數據。
可參考如下示例。其中,property.*
?是 Librdkafka 客戶端相關配置,根據實際 Kafka 集群情況配置。
-- 準備好 kafka 集群和 topic log__topic_
-- 創建 routine load,從 kafka log__topic_將數據導入 log_table 表
CREATE ROUTINE LOAD load_log_kafka ON log_db.log_table
COLUMNS(ts, clientip, request, status, size)
PROPERTIES (
"max_batch_interval" = "10",
"max_batch_rows" = "1000000",
"max_batch_size" = "109715200",
"load_to_single_tablet" = "true",
"timeout" = "600",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA (
"kafka_broker_list" = "host:port",
"kafka_topic" = "log__topic_",
"property.group.id" = "your_group_id",
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="GSSAPI",
"property.sasl.kerberos.service.name"="kafka",
"property.sasl.kerberos.keytab"="/path/to/xxx.keytab",
"property.sasl.kerberos.principal"="<xxx@yyy.com>"
);
-- 查看 routine 的狀態
SHOW ROUTINE LOAD;
更多關于 Kafka 配置和使用的說明,可參考?Routine Load。
使用自定義程序采集日志
除了對接常用的日志采集器以外,你也可以自定義程序,通過 HTTP API Stream Load 將日志數據導入 Apache Doris。參考以下代碼:
curl
--location-trusted
-u username:password
-H "format:json"
-H "read_json_by_line:true"
-H "load_to_single_tablet:true"
-H "timeout:600"
-T logfile.json
http://fe_host:fe_http_port/api/log_db/log_table/_stream_load
在使用自定義程序時,需注意以下關鍵點:
- 使用 Basic Auth 進行 HTTP 鑒權,用命令?
echo -n 'username:password' | base64
?進行計算。 - 設置 HTTP header "format:json",指定數據格式為 JSON。
- 設置 HTTP header "read_json_by_line:true",指定每行一個 JSON。
- 設置 HTTP header "load_to_single_tablet:true",指定一次導入寫入一個分桶減少導入的小文件。
- 建議寫入客戶端一個 Batch 的大小為 100MB ~ 1GB。如果你使用的是 Apache Doris 2.1 及更高版本,需通過服務端 Group Commit 功能,降低客戶端 Batch 大小。
第 6 步:查詢和分析日志?
日志查詢
Apache Doris 支持標準 SQL,因此,你可以通過 MySQL 客戶端或者 JDBC 等方式連接到集群,執行 SQL 進行日志查詢。參考以下命令:
mysql -h fe_host -P fe_mysql_port -u your_username -Dyour_db_name
下方列出常見的 5 條 SQL 查詢命令,以供參考:
- 查看最新的 10 條數據
SELECT * FROM your_table_name ORDER BY ts DESC LIMIT 10;
- 查詢?
host
?為?8.8.8.8
?的最新 10 條數據
SELECT * FROM your_table_name WHERE host = '8.8.8.8' ORDER BY ts DESC LIMIT 10;
- 檢索請求字段中有?
error
?或者?404
?的最新 10 條數據。其中,MATCH_ANY
?是 Apache Doris 全文檢索的 SQL 語法,用于匹配參數中任一關鍵字。
SELECT * FROM your_table_name WHERE message MATCH_ANY 'error 404'
ORDER BY ts DESC LIMIT 10;
- 檢索請求字段中有?
image
?和?faq
?的最新 10 條數據。其中,MATCH_ALL
?是 Apache Doris 全文檢索的 SQL 語法,用于匹配參數中所有關鍵字。
SELECT * FROM your_table_name WHERE message MATCH_ALL 'image faq'
ORDER BY ts DESC LIMIT 10;
- 檢索請求字段中有?
image
?和?faq
?的最新 10 條數據。其中,MATCH_PHRASE
?是 Apache Doris 全文檢索的 SQL 語法,用于匹配參數中所有關鍵字,并且要求順序一致。在下方例子中,a image faq b
?能匹配,但是?a faq image b
?不能匹配,因為?image
?和?faq
?的順序與查詢不一致。
SELECT * FROM your_table_name WHERE message MATCH_PHRASE 'image faq'
ORDER BY ts DESC LIMIT 10;
可視化日志分析
一些第三方廠商提供了基于 Apache Doris 的可視化日志分析開發平臺,包含類 Kibana Discover 的日志檢索分析界面,提供直觀、易用的探索式日志分析交互。
- 支持全文檢索和 SQL 兩種模式
- 支持時間框和直方圖上選擇查詢日志的時間段
- 支持信息豐富的日志明細展示,還可以展開成 JSON 或表格
- 在日志數據上下文交互式點擊增加和刪除篩選條件
- 搜索結果的字段 Top 值展示,便于發現異常值和進一步下鉆分析