場景解析
在企業網絡安全日志處理場景中,防火墻、入侵檢測系統(IDS)等設備會持續產生大量日志,記錄網絡流量、訪問請求、異常事件等基礎信息,但這些原始日志僅能呈現表面現象,難以全面剖析安全威脅,需要在日志處理過程中引入外部數據增強安全分析能力。
例如,某天 IDS(入侵檢測系統) 日志記錄到多個來自同一 IP 地址的異常端口掃描行為,原始日志僅顯示時間、源 IP、掃描端口等信息,無法判斷該 IP 是否為惡意攻擊源,也不清楚其背后的攻擊意圖。此時,就需要引用外部數據來豐富日志內容。可以從威脅情報平臺獲取該 IP 地址的歷史攻擊記錄、所屬的惡意組織標簽等數據,從地理位置數據庫獲取其所在地區、網絡服務提供商等信息,并將這些外部數據與原始日志進行關聯整合。
經過數據融合后,原本孤立的日志事件就有了更豐富的背景信息。安全人員可以通過這些豐富的日志數據,快速判斷該 IP 地址是否屬于已知的惡意攻擊源,是否存在特定的攻擊目標偏好,進而采取更精準的安全防護措施,如封禁 IP、加強對應端口的防護策略等。同時,還能基于這些數據對攻擊模式進行分析,預測潛在的安全風險,提前完善安全防御體系,提升企業整體的網絡安全防護能力。
方案解析
觀測云 Pipeline 是一個可編程數據處理器,使用觀測云開源的 Platypus 語言作為運行時,能夠在邊緣節點進行大規模數據分析和特征提取。Datakit Pipeline 提供以下兩個內置函數用于從外部表引用數據:
- query_refer_table(),函數原型為?
fn query_refer_table(table_name: str, key: str, value)
,它能夠查詢 table_name 表中 key 列為 value 的行,將返回的首行數據豐富至日志中; - mquery_refer_table(),函數原型為?
fn mquery_refer_table(table_name: str, keys: list, values: list)
,相比 query_refer_table(),該函數能夠使用多個列和值對 table_name 表進行查詢。
在以上函數的支持下,Pipeline 能夠使用外部表對安全日志進行豐富,以 Zeek conn.log 為例,豐富前的日志如下:
{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17
}
假設在 Pipeline 中引用了風險情報表,此表中記錄了危險 IP,包含 IP 列和信息列,當源 IP id.orig_h 字段的值能夠匹配到風險表中 IP 列的值時,就會為此日志豐富風險信息字段,豐富后的日志如下:
{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17,"risk_ip": "192.168.4.76", # 豐富風險信息字段"risk_info": "此 IP 近期發起大量攻擊" # 豐富風險信息字段
}
在對日志完成豐富后就可以在觀測云過濾存在 risk_info 字段的日志進行告警和特征分析,具體的分析方法和場景根據豐富的字段擴展,例如豐富了風險 IP 的地理位置和運營商信息后就能在觀測云儀表盤中以地圖的方式呈現攻擊來源。
在外部表管理方面,Datakit 從 refer_table_url 中以指定間隔拉取外部表數據供 Pipeline 使用,外部表數據必須組織為以下格式:
[{"table_name": "table_abc","column_name": ["col", "col2", "col3", "col4"],"column_type": ["string", "float", "int", "bool"],"row_data": [["a", 123, "123", "false"],["ab", "1234.", "123", true],["ab", "1234.", "1235", "false"]]},{"table_name": "table_ijk","column_name": ["name", "id"],"column_type": ["string", "string"],"row_data": [["a", "12"],["a", "123"],["ab", "1234"]]}
]
也就是說必須提供一個 HTTP/HTTPS 端點暴露表數據,可以使用 Nginx 托管 JSON 文件的方式,但是考慮到更靈活的集成能力,推薦內網部署觀測云 DataFlux Func,Func 是一個函數開發、管理、執行平臺,可將集成了威脅平臺的 Python 函數暴露為拉取表數據的端點,也就是說,當 Datakit 從此端點同步數據時會觸發腳本運行,腳本將從一個或者多個平臺獲取數據并組裝為指定的格式。整體架構如下:
注意:該功能內存消耗較高,以 150 萬行(refer_table 行數)、磁盤占用約 200MB(JSON 文件)的不重復數據(string 類型兩列;int,float,bool 各一列)為例,內存占用維持在 950MB ~ 1.2GB,更新時的峰值內存 2.2GB ~ 2.7GB,可通過配置 use_sqlite = true,將數據保存到磁盤上(即使用 SQLite 存儲數據,而不是內存)。
演示用例
前置條件
假設用戶具備以下條件:
- 部署了 DataKit 的 Linux 主機;
- 部署了 DataFlux Func。
配置 Func
在 Func 中新建腳本集 “Pipeline 外部表 Demo”,新建 main 文件,寫入以下函數后發布:
@DFF.API('外部表', cache_result=3000, timeout=10)
def refer_table():'''返回符合 Pipeline query_refer_table() 和 mquery_refer_table() 函數格式要求的表數據。'''data = [{"table_name": "risk_ip","column_name": ["risk_ip", "risk_info"],"column_type": ["string", "string"],"row_data": [["180.173.79.213", "屬于 xxx 組織的惡意 IP,存在端口和漏洞掃描行為"],["180.173.79.214", "屬于 xxx 組織的惡意 IP,存在病毒傳播行為"],]}]print('執行同步請求')return data
在 Func 管理頁面中新建同步 API,將此函數暴露為接口,為 DataKit 提供外部數據,點擊“示例”即可查看接口 URL:
在 Shell 中請求此 URL 即可獲得數據:
配置 DataKit 拉取外部表
編輯 DataKit 配置文件:
vim /usr/local/datakit/conf.d/datakit.conf
修改以下配置:
[pipeline]# 將 <YOUR-FUNC-API-URL> 替換為 Func 同步 API 的 URLrefer_table_url = <YOUR-FUNC-API-URL>refer_table_pull_interval = "5m"use_sqlite = truesqlite_mem_mode = false
重啟 DataKit 使配置生效:
datakit service -R
配置示例日志
執行以下命令,使用腳本生成測試日志:
# 創建測試目錄
mkdir -p ~/workspace/log_demo && cd $_# 創建腳本
cat > gen_log.sh << 'EOF'
#!/bin/bashwhile true; dotimestamp=$(date +%s.%N | cut -c1-17)ips=("180.173.79.213" "180.173.79.214")log="{\"ts\":${timestamp},\"uid\":\"CMdzit1AMNsmfAIiQc\",\"id.orig_h\":\"${ips[$((RANDOM % 2))]}\",\"id.orig_p\":36844,\"id.resp_h\":\"192.168.4.1\",\"id.resp_p\":53,\"proto\":\"udp\",\"service\":\"dns\",\"duration\":0.06685185432434082,\"orig_bytes\":62,\"resp_bytes\":141,\"conn_state\":\"SF\",\"missed_bytes\":0,\"history\":\"Dd\",\"orig_pkts\":2,\"orig_ip_bytes\":118,\"resp_pkts\":2,\"resp_ip_bytes\":197,\"ip_proto\":17}"echo "${log}" >> ./demo.logecho "++ gen log: ${log}"sleep 1
done
EOF# 運行腳本,將在當前目錄下生成日志文件 demo.log
bash gen_log.sh
配置 DataKit 采集示例日志
配置 DataKit 日志采集插件:
cd /usr/local/datakit/conf.d/log
cp logging.conf.sample demo.conf
vim demo.conf
修改以下配置:
[[inputs.logging]]# 日志文件路徑logfiles = ["/root/workspace/log_demo/demo.log",]# 日志來源source = "demo"
重啟 DataKit 使配置生效:
datakit service -R
登錄觀測云點擊【日志】,可見 demo.log 已經被采集。
配置 Pipeline
點擊【Pipelines】-【新建 Pipeline】,運行模式選擇“本地 Pipeline”,索引選擇 “default”,日志來源選擇 “demo”,Pipeline 名稱填寫 “demo”,在“定義解析規則”中輸入以下腳本后點擊保存:
# 將 JSON 字符串轉換為對象
obj = load_json(_)# 從 JSON 對象中提取字段并處理
# 原始日志使用時間戳標記日志時間,可讀性差,提取此字段并轉換為人類易讀的格式
pt_kvs_set("ts_date", obj["ts"]*1000000)
datetime(ts_date, "us", "RFC3339Nano", "Asia/Shanghai")# 從日志中提取源 IP,并根據源 IP 從外部表中豐富字段
pt_kvs_set("src_ip", obj["id.orig_h"])
# query_refer_table 函數的參數分別為外部表名、列名、列值
query_refer_table("risk_ip", "risk_ip", src_ip)
效果確認
Pipeline 保存后約 1 分鐘后生效,生效后新增的字段需等待服務端刷新字段后才可查詢,刷新完成后查看日志列表,已經豐富了相關字段。
可以快速分析風險的趨勢和占比。
配置告警后,可在風險發生時及時告警。
如果在告警中關聯創建異常追蹤,可閉環對安全風險的處理過程。