Pipeline 引用外部數據源最佳實踐

場景解析

在企業網絡安全日志處理場景中,防火墻、入侵檢測系統(IDS)等設備會持續產生大量日志,記錄網絡流量、訪問請求、異常事件等基礎信息,但這些原始日志僅能呈現表面現象,難以全面剖析安全威脅,需要在日志處理過程中引入外部數據增強安全分析能力。

例如,某天 IDS(入侵檢測系統) 日志記錄到多個來自同一 IP 地址的異常端口掃描行為,原始日志僅顯示時間、源 IP、掃描端口等信息,無法判斷該 IP 是否為惡意攻擊源,也不清楚其背后的攻擊意圖。此時,就需要引用外部數據來豐富日志內容。可以從威脅情報平臺獲取該 IP 地址的歷史攻擊記錄、所屬的惡意組織標簽等數據,從地理位置數據庫獲取其所在地區、網絡服務提供商等信息,并將這些外部數據與原始日志進行關聯整合。

經過數據融合后,原本孤立的日志事件就有了更豐富的背景信息。安全人員可以通過這些豐富的日志數據,快速判斷該 IP 地址是否屬于已知的惡意攻擊源,是否存在特定的攻擊目標偏好,進而采取更精準的安全防護措施,如封禁 IP、加強對應端口的防護策略等。同時,還能基于這些數據對攻擊模式進行分析,預測潛在的安全風險,提前完善安全防御體系,提升企業整體的網絡安全防護能力。

方案解析

觀測云 Pipeline 是一個可編程數據處理器,使用觀測云開源的 Platypus 語言作為運行時,能夠在邊緣節點進行大規模數據分析和特征提取。Datakit Pipeline 提供以下兩個內置函數用于從外部表引用數據:

  • query_refer_table(),函數原型為?fn query_refer_table(table_name: str, key: str, value),它能夠查詢 table_name 表中 key 列為 value 的行,將返回的首行數據豐富至日志中;
  • mquery_refer_table(),函數原型為?fn mquery_refer_table(table_name: str, keys: list, values: list),相比 query_refer_table(),該函數能夠使用多個列和值對 table_name 表進行查詢。

在以上函數的支持下,Pipeline 能夠使用外部表對安全日志進行豐富,以 Zeek conn.log 為例,豐富前的日志如下:

{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17
}

假設在 Pipeline 中引用了風險情報表,此表中記錄了危險 IP,包含 IP 列和信息列,當源 IP id.orig_h 字段的值能夠匹配到風險表中 IP 列的值時,就會為此日志豐富風險信息字段,豐富后的日志如下:

{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17,"risk_ip": "192.168.4.76", # 豐富風險信息字段"risk_info": "此 IP 近期發起大量攻擊" # 豐富風險信息字段
}

在對日志完成豐富后就可以在觀測云過濾存在 risk_info 字段的日志進行告警和特征分析,具體的分析方法和場景根據豐富的字段擴展,例如豐富了風險 IP 的地理位置和運營商信息后就能在觀測云儀表盤中以地圖的方式呈現攻擊來源。

在外部表管理方面,Datakit 從 refer_table_url 中以指定間隔拉取外部表數據供 Pipeline 使用,外部表數據必須組織為以下格式:

[{"table_name": "table_abc","column_name": ["col", "col2", "col3", "col4"],"column_type": ["string", "float", "int", "bool"],"row_data": [["a", 123, "123", "false"],["ab", "1234.", "123", true],["ab", "1234.", "1235", "false"]]},{"table_name": "table_ijk","column_name": ["name", "id"],"column_type": ["string", "string"],"row_data": [["a", "12"],["a", "123"],["ab", "1234"]]}
]

也就是說必須提供一個 HTTP/HTTPS 端點暴露表數據,可以使用 Nginx 托管 JSON 文件的方式,但是考慮到更靈活的集成能力,推薦內網部署觀測云 DataFlux Func,Func 是一個函數開發、管理、執行平臺,可將集成了威脅平臺的 Python 函數暴露為拉取表數據的端點,也就是說,當 Datakit 從此端點同步數據時會觸發腳本運行,腳本將從一個或者多個平臺獲取數據并組裝為指定的格式。整體架構如下:

注意:該功能內存消耗較高,以 150 萬行(refer_table 行數)、磁盤占用約 200MB(JSON 文件)的不重復數據(string 類型兩列;int,float,bool 各一列)為例,內存占用維持在 950MB ~ 1.2GB,更新時的峰值內存 2.2GB ~ 2.7GB,可通過配置 use_sqlite = true,將數據保存到磁盤上(即使用 SQLite 存儲數據,而不是內存)。

演示用例

前置條件

假設用戶具備以下條件:

  • 部署了 DataKit 的 Linux 主機;
  • 部署了 DataFlux Func。

配置 Func

在 Func 中新建腳本集 “Pipeline 外部表 Demo”,新建 main 文件,寫入以下函數后發布:

@DFF.API('外部表', cache_result=3000, timeout=10)
def refer_table():'''返回符合 Pipeline query_refer_table() 和 mquery_refer_table() 函數格式要求的表數據。'''data = [{"table_name": "risk_ip","column_name": ["risk_ip", "risk_info"],"column_type": ["string", "string"],"row_data": [["180.173.79.213", "屬于 xxx 組織的惡意 IP,存在端口和漏洞掃描行為"],["180.173.79.214", "屬于 xxx 組織的惡意 IP,存在病毒傳播行為"],]}]print('執行同步請求')return data

在 Func 管理頁面中新建同步 API,將此函數暴露為接口,為 DataKit 提供外部數據,點擊“示例”即可查看接口 URL:

在 Shell 中請求此 URL 即可獲得數據:

配置 DataKit 拉取外部表

編輯 DataKit 配置文件:

vim /usr/local/datakit/conf.d/datakit.conf

修改以下配置:

[pipeline]# 將 <YOUR-FUNC-API-URL> 替換為 Func 同步 API 的 URLrefer_table_url = <YOUR-FUNC-API-URL>refer_table_pull_interval = "5m"use_sqlite = truesqlite_mem_mode = false

重啟 DataKit 使配置生效:

datakit service -R

配置示例日志

執行以下命令,使用腳本生成測試日志:

# 創建測試目錄
mkdir -p ~/workspace/log_demo && cd $_# 創建腳本
cat > gen_log.sh << 'EOF'
#!/bin/bashwhile true; dotimestamp=$(date +%s.%N | cut -c1-17)ips=("180.173.79.213" "180.173.79.214")log="{\"ts\":${timestamp},\"uid\":\"CMdzit1AMNsmfAIiQc\",\"id.orig_h\":\"${ips[$((RANDOM % 2))]}\",\"id.orig_p\":36844,\"id.resp_h\":\"192.168.4.1\",\"id.resp_p\":53,\"proto\":\"udp\",\"service\":\"dns\",\"duration\":0.06685185432434082,\"orig_bytes\":62,\"resp_bytes\":141,\"conn_state\":\"SF\",\"missed_bytes\":0,\"history\":\"Dd\",\"orig_pkts\":2,\"orig_ip_bytes\":118,\"resp_pkts\":2,\"resp_ip_bytes\":197,\"ip_proto\":17}"echo "${log}" >> ./demo.logecho "++ gen log: ${log}"sleep 1
done
EOF# 運行腳本,將在當前目錄下生成日志文件 demo.log
bash gen_log.sh

配置 DataKit 采集示例日志

配置 DataKit 日志采集插件:

cd /usr/local/datakit/conf.d/log
cp logging.conf.sample demo.conf
vim demo.conf

修改以下配置:

[[inputs.logging]]# 日志文件路徑logfiles = ["/root/workspace/log_demo/demo.log",]# 日志來源source = "demo"

重啟 DataKit 使配置生效:

datakit service -R

登錄觀測云點擊【日志】,可見 demo.log 已經被采集。

配置 Pipeline

點擊【Pipelines】-【新建 Pipeline】,運行模式選擇“本地 Pipeline”,索引選擇 “default”,日志來源選擇 “demo”,Pipeline 名稱填寫 “demo”,在“定義解析規則”中輸入以下腳本后點擊保存:

# 將 JSON 字符串轉換為對象
obj = load_json(_)# 從 JSON 對象中提取字段并處理
# 原始日志使用時間戳標記日志時間,可讀性差,提取此字段并轉換為人類易讀的格式
pt_kvs_set("ts_date", obj["ts"]*1000000)
datetime(ts_date, "us", "RFC3339Nano", "Asia/Shanghai")# 從日志中提取源 IP,并根據源 IP 從外部表中豐富字段
pt_kvs_set("src_ip", obj["id.orig_h"])
# query_refer_table 函數的參數分別為外部表名、列名、列值
query_refer_table("risk_ip", "risk_ip", src_ip)

效果確認

Pipeline 保存后約 1 分鐘后生效,生效后新增的字段需等待服務端刷新字段后才可查詢,刷新完成后查看日志列表,已經豐富了相關字段。

可以快速分析風險的趨勢和占比。

配置告警后,可在風險發生時及時告警。

如果在告警中關聯創建異常追蹤,可閉環對安全風險的處理過程。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913673.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913673.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913673.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

UI + MCP Client + MCP Server(并且鏈接多個Server)

項目結構前端項目--------->MCP Client----------->MCP Serverserver就不過多贅述了&#xff0c;他只是相當于添加了多個的tools 鏈接前后端 http.createServer創建一個服務器// ---------------------------------------------------------------- // server.js import …

香港站群服務器與普通香港服務器對比

在選擇香港服務器時&#xff0c;用戶常常會遇到"站群服務器"和"普通服務器"兩種選項&#xff0c;雖然它們都基于香港數據中心的基礎設施&#xff0c;但在 IP 地址配置、功能定位和管理復雜度、成本上存在顯著差異&#xff0c;理解這些差異有助于用戶根據實…

4.B樹和B+樹的區別?為什么MySQL選擇B+樹作為索引?

區別&#xff1a;1.數據存儲位置B樹每個節點都存儲了索引和數據B樹只有葉子節點存儲數據&#xff0c;非葉子節點僅存儲索引2.葉子節點的鏈接B樹的所有葉子節點通過指針連接成一個雙向鏈表&#xff0c;可以高效地進行范圍查詢或者順序遍歷B樹則沒有這樣的連接關系&#xff0c;查…

轉換狂魔,Modbus TCP轉Profinet網關打通視覺傳感線連接之路

在汽車零部件沖壓生產線的世界中&#xff0c;液壓機的壓力穩定性是確保產品質量的秘密武器。然而&#xff0c;舊時代的人工巡檢和傳統監測方式卻好似拖累現代化進程的沉重枷鎖&#xff1a;效率低、成本高&#xff0c;還總是趕不上實時反饋的快車。這時&#xff0c;工廠決心大刀…

C++進階—二叉樹進階

第一章&#xff1a;內容安排說明 map和set特性需要先鋪墊二叉搜索樹&#xff0c;而二叉搜索樹也是一種樹形結構二叉搜索樹的特性了解&#xff0c;有助于更好的理解map和set的特性二叉樹中部分面試題稍微有點難度&#xff0c;在前面講解大家不容易接受&#xff0c;且時間長容易…

驅動下一代E/E架構的神經脈絡進化—10BASE-T1S

汽車電子電氣架構的演進正經歷一場深刻的變革&#xff0c;“中央計算單元區域控制器”的架構模式已成為當前主流車型平臺發展的明確方向。這種從傳統的“功能域”&#xff08;Domain&#xff09;架構向“區域”&#xff08;Zonal&#xff09;架構的轉型升級&#xff0c;旨在實現…

某學校系統中挖礦病毒應急排查

本篇文章主要記錄某學校長期未運營維護的程序&#xff0c;被黑客發現了漏洞&#xff0c;但好在學校有全流量設備&#xff0c;抓取到了過程中的流量包 需要你進行上機以及結合流量分析&#xff0c;排查攻擊者利用的漏洞以及上傳利用成功的木馬 文章目錄靶機介紹1.使用工具分析共…

vue 、react前端頁面支持縮放,echarts、地圖點擊左邊不準的原因和解決辦法

原因 由于以上都是通過canvas畫布生成的&#xff0c;一旦初始化&#xff0c;就會按照比例進行縮放&#xff0c;但與此同時&#xff0c;比例尺并沒有變化&#xff0c;導致坐標偏移 解決辦法 設置一個zoomVal產量&#xff0c;在頁面加載時計算縮放比例&#xff0c;然后在canvas容…

(LeetCode 每日一題) 1353. 最多可以參加的會議數目 (優先隊列、小頂堆)

題目&#xff1a;1353. 最多可以參加的會議數目 思路&#xff1a;優先隊列實現小頂堆&#xff0c;0(mx*logn) 在第i天&#xff0c;優先選endDay最小的那一個活動進行。那么遍歷每一天&#xff0c;用小頂堆來維護每個活動的最后一天即可&#xff0c;細節看注釋。 C版本&#xf…

Java結構型模式---代理模式

代理模式基礎概念代理模式是一種結構型設計模式&#xff0c;其核心思想是通過創建一個代理對象來控制對另一個真實對象的訪問。代理對象在客戶端和真實對象之間起到中介作用&#xff0c;允許在不改變真實對象的前提下&#xff0c;對其進行增強或控制。代理模式的核心組件主題接…

MySQL流程控制函數全解析

MySQL 中的流程控制函數&#xff08;也稱為條件函數&#xff09;允許你在 SQL 語句中進行邏輯判斷&#xff0c;根據不同的條件返回不同的值或執行不同的操作。它們極大地增強了 SQL 的靈活性和表達能力&#xff0c;尤其在進行數據轉換、結果格式化、條件聚合和復雜業務邏輯實現…

【7】PostgreSQL 事務

【7】PostgreSQL 事務前言使用事務事務內錯誤處理事務保存點DDL 事務前言 在 PostgreSQL 中&#xff0c;每一個操作都是一個事務。即使一個簡單的查詢(select)&#xff0c;這也是一個事務。 例如&#xff1a; postgres# select now();now --------------------…

Linux:多線程---深入互斥淺談同步

文章目錄1. 互斥1.1 為什么需要互斥1.2 互斥鎖1.3 初談互斥與同步1.4 鎖的原理1.5 可重入VS線程安全1.6 死鎖1.7 避免死鎖的算法&#xff08;擴展&#xff09;序&#xff1a;在上一章中我們知道了線程控制的三個角度&#xff1a;線程創建、線程等待和線程終止&#xff0c;分別從…

適用于 vue2、vue3 的自定義指定:v-int(正整數)

在項目中&#xff0c;我們經常會遇到輸入框只允許輸入數字的情況&#xff0c;下面是一段自定義指定 代碼&#xff0c;復制到項目中&#xff0c;注冊指定即可使用用法如下&#xff1a; 創建一個IntInput.js 文件&#xff0c;將下面代碼復制到文件中保存在項目中的 main.js 文件中…

學習基于springboot秒殺系統-環境配置(接口封裝,mybatis,mysql,redis(Linux))

文章目錄前言創建springboot項目封裝controller層輸入輸出rest api 的json輸出返回頁面集成mybatis集成redis下載虛擬機和centos下載redis.tar.gz上傳redis.tar.gz 到虛擬機前言 今天開始記錄學習秒殺系統-課程是基于慕課上的搜索秒殺系統的課程&#xff0c;老師講解非常好。這…

stm32達到什么程度叫精通?

STM32達到什么程度叫精通&#xff1f;一個十年老兵的深度反思 前言&#xff1a;精通二字&#xff0c;重如泰山 每次有人問我"STM32達到什么程度叫精通"這個問題&#xff0c;我都會沉默很久。 不是因為這個問題難回答&#xff0c;而是因為"精通"這兩個字太重…

微軟上線Deep Research:OpenAI同款智能體,o3+必應雙王炸

今天凌晨&#xff0c;微軟在官網宣布&#xff0c;Azure AI Foundry中上線Deep Research公開預覽版。這是支持API和SDK的OpenAI 高級智能體研究能力產品&#xff0c;并且Azure 的企業級智能體平臺完全集成。Deep Research是OpenAI在今年4月25日發布的最新產品&#xff0c;能夠像…

Spring Batch終極指南:原理、實戰與性能優化

&#x1f31f; Spring Batch終極指南&#xff1a;原理、實戰與性能優化單機日處理10億數據&#xff1f;揭秘企業級批處理架構的核心引擎&#xff01;一、Spring Batch 究竟是什么&#xff1f;Spring batch是用于創建批處理應用程序&#xff08;執行一系列作業&#xff09;的開源…

【Part 3 Unity VR眼鏡端播放器開發與優化】第四節|高分辨率VR全景視頻播放性能優化

文章目錄《VR 360全景視頻開發》專欄Part 3&#xff5c;Unity VR眼鏡端播放器開發與優化第一節&#xff5c;基于Unity的360全景視頻播放實現方案第二節&#xff5c;VR眼鏡端的開發適配與交互設計第三節&#xff5c;Unity?VR手勢交互開發與深度優化第四節&#xff5c;高分辨率V…

TCP/IP協議基礎

TCPIP協議基礎 網絡模型 -OSI參考模型 -OSI參考模型各層功能 -TCP/IP網絡模型 -TCP/IP協議棧OSI參考模型 – 為了解決網絡設備之間的兼容性問題&#xff0c;國際標準化組織ISO于1984年提出了OSI RM&#xff08;開放系統互連參考模型&#xff09;。 OSI參考模型一共有七層&#…