OpenLineage 是一個用于元數據和血緣采集的開放標準,專為在作業運行時動態采集數據而設計。它通過統一的命名策略定義了由作業(Job)、運行實例(Run)和數據集(Dataset) 組成的通用模型,并通過可擴展的Facets機制對這些實體進行元數據增強。
該項目是 LF AI & Data 基金會的畢業級項目,處于活躍開發階段,歡迎社區貢獻。
使用 Apache Airflow? 和 OpenLineage + Marquez 入門
Getting Started with Apache Airflow? and OpenLineage+Marquez
本教程將指導你配置 Apache Airflow? 以將 OpenLineage 事件發送到 Marquez,并通過一個真實的故障排查場景進行探索。
目錄
- 前提條件
- 獲取并啟動 Marquez
- 配置 Airflow 將 OpenLineage 事件發送到 Marquez
- 編寫 Airflow DAG
- 在 Marquez 中查看已收集的血緣
- 使用 Marquez 排查失敗的 DAG
- 后續步驟
- 反饋
前提條件
開始前,請確保已安裝:
- Docker 17.05+
- Apache Airflow 2.7+ 本地運行。
如需在本地輕松安裝并運行 Airflow 以用于開發,請參閱:快速開始。
獲取并啟動 Marquez
-
創建 Marquez 目錄,然后通過運行以下命令檢出 Marquez 源碼:
MacOS/Linux
git clone https://github.com/MarquezProject/marquez && cd marquez
Windows
git config --global core.autocrlf false git clone https://github.com/MarquezProject/marquez && cd marquez
-
Airflow 和 Marquez 都需要 5432 端口用于其元數據庫,但 Marquez 服務更易于配置。你也可以即時為數據庫服務分配一個新端口。要使用 2345 端口啟動 Marquez,請運行:
MacOS/Linux
./docker/up.sh --db-port 2345
Windows
驗證 Postgres 和 Bash 是否在
PATH
中,然后運行:sh ./docker/up.sh --db-port 2345
-
要查看 Marquez UI 并驗證其運行狀態,請打開 http://localhost:3000。該 UI 允許你:
- 查看跨平臺依賴關系,即你可在生態系統中查看生成或消費關鍵表的工具中的作業。
- 查看當前和先前作業運行的運行級元數據,使你能夠看到作業的最新狀態和數據集的更新歷史。
- 獲取資源使用情況的高級視圖,使你能夠查看操作中的趨勢。
配置 Airflow 將 OpenLineage 事件發送到 Marquez
-
要配置 Airflow 以將 OpenLineage 事件發送到 Marquez,你需要修改本地 Airflow 環境并添加依賴。首先,定義一個 OpenLineage 傳輸。一種方法是使用環境變量。要使用
http
并將事件發送到本地端口5000
上運行的 Marquez API,請運行:MacOS/Linux
export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
Windows
set AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
-
你還需要為 Airflow 作業定義一個命名空間。它可以是任意字符串。請運行:
MacOS/Linux
export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
Windows
set AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
-
要將所需的 Airflow OpenLineage Provider 包添加到你的 Airflow 環境,請運行:
MacOS/Linux
pip install apache-airflow-providers-openlineage
Windows
pip install apache-airflow-providers-openlineage
-
要完成本教程,你還需要在 Airflow 中啟用本地 Postgres 操作。為此,請運行:
MacOS/Linux
pip install apache-airflow-providers-postgres
Windows
pip install apache-airflow-providers-postgres
-
在本地 Postgres 實例中創建一個數據庫,并使用默認 ID (
postgres_default
) 創建一個 Airflow Postgres 連接。如需前者幫助,請參閱:Postgres 文檔。如需后者幫助,請參閱:管理連接。
編寫 Airflow DAG
在此步驟中,你將創建兩個新的 Airflow DAG,它們執行簡單任務,并將其添加到你現有的 Airflow 實例。counter
DAG 每分鐘將列值加 1,而 sum
DAG 每五分鐘計算一次總和。這將形成一個包含兩個作業和兩個數據集的簡單管道。
-
在
dags/
目錄下,創建一個名為counter.py
的文件,并添加以下代碼:import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.utils.dates import days_ago@dag(schedule='*/1 * * * *',start_date=days_ago(1),catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that generates a new count value equal to 1.' )def counter():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS counts (value INTEGER);''',)query2 = PostgresOperator(task_id='inc',postgres_conn_id='postgres_default',sql='''INSERT INTO "counts" (value) VALUES (1);''',)query1 >> query2counter()
-
在
dags/
目錄下,創建一個名為sum.py
的文件,并添加以下代碼:import pendulum from airflow.decorators import dag, task from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.utils.dates import days_ago@dag(start_date=days_ago(1),schedule='*/5 * * * *',catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that sums the total of generated count values.' )def sum():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS sums (value INTEGER);''')query2 = PostgresOperator(task_id='total',postgres_conn_id='postgres_default',sql='''INSERT INTO sums (value)SELECT SUM(value) FROM counts;''')query1 >> query2sum()
-
重啟 Airflow 以應用更改。然后,取消暫停兩個 DAG。
在 Marquez 中查看已收集的血緣
-
要查看 Marquez 從 Airflow 收集的血緣,請訪問 http://localhost:3000 打開 Marquez UI。然后,使用左上角搜索欄搜索
counter.inc
作業。要查看counter.inc
的血緣元數據,請從下拉列表中點擊該作業: -
查看
counter.inc
的血緣圖,你應看到<database>.public.counts
作為輸出數據集,sum.total
作為下游作業:
使用 Marquez 排查失敗的 DAG
-
在此步驟中,你將模擬由于跨 DAG 依賴項更改導致的管道中斷,并了解來自 OpenLineage + Marquez 的增強血緣如何使架構更改的故障排查變得輕松。
假設
Team A
擁有 DAGcounter
。Team A
更新counter
以將counts
表中的values
列重命名為value_1_to_10
,但未將架構更改正確傳達給擁有sum
的團隊。對
counter
應用以下更改以模擬破壞性更改:query1 = PostgresOperator( - task_id='if_not_exists', + task_id='alter_name_of_column',postgres_conn_id='example_db',sql=''' - CREATE TABLE IF NOT EXISTS counts ( - value INTEGER - );''', + ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10"; + ''' )
query2 = PostgresOperator(task_id='inc',postgres_conn_id='example_db',sql=''' - INSERT INTO counts (value) + INSERT INTO counts (value_1_to_10)VALUES (1)''', )
正如
sum
的所有者Team B
所做的那樣,注意 Marquez 中 DataOps 視圖的失敗運行:Team B
只能猜測 DAG 失敗的可能原因,因為 DAG 最近沒有更改。因此,團隊決定檢查 Marquez。 -
在 Marquez 中,導航到 Datasets 視圖,并從右上角的命名空間下拉菜單中選擇你的 Postgres 實例。然后,點擊
<database>.public.counts
數據集并檢查圖表。你將在節點上找到架構: -
假設你不認識該列,并希望了解其原始名稱及更改時間。點擊節點將打開詳情抽屜。在那里,使用版本歷史查找架構更改的運行:
-
在 Airflow 中,通過更新計算計數總和的任務以使用新列名來修復中斷的下游 DAG:
query2 = PostgresOperator(task_id='total',postgres_conn_id='example_db',sql=''' - INSERT INTO sums (value) - SELECT SUM(value) FROM counts; + SELECT SUM(value_1_to_10) FROM counts;''' )
-
重新運行 DAG。在 Marquez 中,通過查看 DataOps 視圖中最近的運行歷史來驗證修復:
后續步驟
- 查看用于收集 Airflow DAG 元數據的 Marquez HTTP API,并學習如何使用 OpenLineage 構建自己的集成。
- 查看可與 Airflow 一起使用的
openlineage-spark
集成。
反饋
你覺得本指南如何?請在 OpenLineage Slack 或 Marquez Slack 中告訴我們。你也可以通過 提交拉取請求 直接提出更改。
風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。