OpenLineage 是一個用于元數據和血緣采集的開放標準,專為在作業運行時動態采集數據而設計。它通過統一的命名策略定義了由作業(Job)、運行實例(Run)和數據集(Dataset) 組成的通用模型,并通過可擴展的Facets機制對這些實體進行元數據增強。
該項目是 LF AI & Data 基金會的畢業級項目,處于活躍開發階段,歡迎社區貢獻。
在 Airflow 中使用 OpenLineage Proxy
本教程介紹如何將 OpenLineage Proxy 與 Airflow 結合使用。OpenLineage 提供多種集成方案,可在使用 Airflow 集成 時讓 Airflow 發出 OpenLineage 事件。本教程將使用 Docker Compose 運行本地 Airflow 實例,并學習如何啟用和配置 OpenLineage 以發出數據血緣事件。教程將使用兩個后端來查看數據血緣:1)Proxy,2)Marquez。
目錄
- 使用 Docker Compose 搭建本地 Airflow 環境
- 配置 Marquez
- 啟動所有服務
- 訪問 Airflow UI
- 運行示例 DAG
使用 Docker Compose 搭建本地 Airflow 環境
Airflow 提供一種便捷方式,通過 Docker Compose 搭建并運行完整環境。因此,在開始本教程前,需先安裝以下組件。
前提條件
- Docker 20.10.0+
- Docker Desktop
- Docker Compose
- Java 11
若使用 macOS Monterey(macOS 12),需通過禁用 AirPlay 接收器釋放 5000 端口。若需訪問 Marquez Web UI,還需確保 3000 端口空閑。
參考以下指南使用 Docker Compose 搭建并運行 Airflow。
首先,創建一個新目錄,用于存放所有工作文件。
mkdir ~/airflow-ol &&
cd ~/airflow-ol
然后,下載我們將要運行的 Docker Compose 文件。
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'
這將允許向 Docker 容器傳遞新的環境變量 OPENLINEAGE_URL
,OpenLineage 需要該變量才能工作。
接著,創建以下目錄,這些目錄將被掛載并由啟動 Airflow 的 Docker Compose 使用。
mkdir dags &&
mkdir logs &&
mkdir plugins
同時,創建 .env
文件,其中包含 Airflow 用于安裝所需額外 Python 包的環境變量。本教程將安裝 openlineage-airflow
包。
echo "_PIP_ADDITIONAL_REQUIREMENTS=openlineage-airflow" > .env
還需告知 OpenLineage 將血緣數據發送至何處。
echo "OPENLINEAGE_URL=http://host.docker.internal:4433" >> .env
將后端設置為 host.docker.internal
的原因是我們將在主機而非 Airflow 的 Docker 環境中運行 OpenLineage Proxy。代理將在 4433 端口監聽血緣數據。
將 OpenLineage Proxy 配置為接收端
OpenLineage Proxy 是一個簡單工具,可輕松搭建并運行以接收 OpenLineage 數據。代理本身不執行任何操作,僅顯示接收到的數據。可選地,它還可通過 HTTP 將數據轉發至任何兼容的 OpenLineage 后端。
從 git 下載代理代碼并構建:
cd ~ &&
git clone https://github.com/OpenLineage/OpenLineage.git &&
cd OpenLineage/proxy/backend &&
./gradlew build
現在,復制 proxy.dev.yml
并按以下內容編輯,保存為 proxy.yml
。
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.server:applicationConnectors:- type: httpport: ${OPENLINEAGE_PROXY_PORT:-4433}adminConnectors:- type: httpport: ${OPENLINEAGE_PROXY_ADMIN_PORT:-4434}logging:level: ${LOG_LEVEL:-INFO}appenders:- type: consoleproxy:source: openLineageProxyBackendstreams:- type: Console- type: Httpurl: http://localhost:5000/api/v1/lineage
配置 Marquez
最后一步是配置 Marquez 后端。使用 Marquez 的快速開始文檔搭建 Marquez 環境。
cd ~ &&
git clone https://github.com/MarquezProject/marquez.git
在 marquez/docker-compose.dev.yml 中,更改 pghero 的端口,以釋放 8080 端口供 Airflow 使用:
version: "3.7"
services:api:build: .seed_marquez:build: .pghero:image: ankane/pgherocontainer_name: pgheroports:- "8888:8888"environment:DATABASE_URL: postgres://postgres:password@db:5432
啟動所有服務
啟動 Marquez
啟動 Docker Desktop,然后:
cd ~/marquez &&
./docker/up.sh
啟動 OpenLineage proxy
cd ~/OpenLineage/proxy/backend &&
./gradlew runShadow
啟動 Airflow
cd ~/airflow-ol
docker-compose up
此時,Apache Airflow 應已運行,并能夠將血緣數據發送至 OpenLineage Proxy,而 OpenLineage Proxy 將數據轉發至 Marquez。因此,我們既可以檢查數據負載,也可以以圖形形式查看血緣數據。
訪問 Airflow UI
所有服務啟動后,現在可通過瀏覽器訪問 Airflow UI,地址為 http://localhost:8080
。
初始登錄 ID 和密碼為 airflow/airflow
。
運行示例 DAG
登錄 Airflow UI 后,會看到啟動時已預填充的多個示例 DAG。我們可以運行其中一些,以查看它們生成的 OpenLineage 事件。
運行 Bash Operator
在 DAGs 頁面,找到 example_bash_operator
。
點擊右側的 ? 按鈕,將彈出提示框。選擇 Trigger DAG
手動觸發并運行 DAG。
將看到 DAG 運行并最終完成。
檢查 OpenLineage 事件
一切完成后,應在 OpenLineage proxy 的控制臺中看到多條 JSON 數據負載輸出。
INFO [2022-08-16 21:39:41,411] io.openlineage.proxy.api.models.ConsoleLineageStream: {"eventTime" : "2022-08-16T21:39:40.854926Z","eventType" : "START","inputs" : [ ],"job" : {"facets" : { },"name" : "example_bash_operator.runme_2","namespace" : "default"},"outputs" : [ ],"producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","run" : {"facets" : {"airflow_runArgs" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","externalTrigger" : true},"airflow_version" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","airflowVersion" : "2.3.3","openlineageAirflowVersion" : "0.12.0","operator" : "airflow.operators.bash.BashOperator","taskInfo" : "{'_BaseOperator__init_kwargs': {'task_id': 'runme_2', 'params': <***.models.param.ParamsDict object at 0xffff7467b610>, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1'}, '_BaseOperator__from_mapped': False, 'task_id': 'runme_2', 'task_group': <weakproxy at 0xffff74676ef0 to TaskGroup at 0xffff7467ba50>, 'owner': '***', 'email': None, 'email_on_retry': True, 'email_on_failure': True, 'execution_timeout': None, 'on_execute_callback': None, 'on_failure_callback': None, 'on_success_callback': None, 'on_retry_callback': None, '_pre_execute_hook': None, '_post_execute_hook': None, 'executor_config': {}, 'run_as_user': None, 'retries': 0, 'queue': 'default', 'pool': 'default_pool', 'pool_slots': 1, 'sla': None, 'trigger_rule': <TriggerRule.ALL_SUCCESS: 'all_success'>, 'depends_on_past': False, 'ignore_first_depends_on_past': True, 'wait_for_downstream': False, 'retry_delay': datetime.timedelta(seconds=300), 'retry_exponential_backoff': False, 'max_retry_delay': None, 'params': <***.models.param.ParamsDict object at 0xffff7467b4d0>, 'priority_weight': 1, 'weight_rule': <WeightRule.DOWNSTREAM: 'downstream'>, 'resources': None, 'max_active_tis_per_dag': None, 'do_xcom_push': True, 'doc_md': None, 'doc_json': None, 'doc_yaml': None, 'doc_rst': None, 'doc': None, 'upstream_task_ids': set(), 'downstream_task_ids': {'run_after_loop'}, 'start_date': DateTime(2021, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'end_date': None, '_dag': <DAG: example_bash_operator>, '_log': <Logger ***.task.operators (INFO)>, 'inlets': [], 'outlets': [], '_inlets': [], '_outlets': [], '_BaseOperator__instantiated': True, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1', 'env': None, 'output_encoding': 'utf-8', 'skip_exit_code': 99, 'cwd': None, 'append_env': False}"},"nominalTime" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet","nominalStartTime" : "2022-08-16T21:39:38.005668Z"},"parentRun" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet","job" : {"name" : "example_bash_operator","namespace" : "default"},"run" : {"runId" : "39ad10d1-72d9-3fe9-b2a4-860c651b98b7"}}},"runId" : "313b4e71-9cde-4c83-b641-dd6773bf114b"}
}
檢查 Marquez
還可打開瀏覽器訪問 http://localhost:3000
進入 Marquez UI,查看來自 Airflow 的 OpenLineage 事件。
運行其他 DAG
由于教程篇幅限制,此處不再運行其他示例 DAG,但你可以嘗試運行它們,觀察各 DAG 如何發出 OpenLineage 事件。請嘗試運行其他示例,如 example_python_operator
,它也會發出 OpenLineage 事件。
通常,當 DAG 運行涉及某些被使用或創建的 dataset
時,數據血緣會更加完整和有用。運行這些 DAG 后,你將能看到不同 DAG 和任務如何連接同一數據集,最終形成如下所示的數據血緣圖:
以下是目前已具備提取器、能夠提取并發出 OpenLineage 事件的 Airflow 算子:
- PostgresOperator
- MySqlOperator
- BigQueryOperator
- SnowflakeOperator
- GreatExpectationsOperator
- PythonOperator
更多可在 Airflow 中運行的 OpenLineage 示例 DAG,請參閱 Apache 示例。
故障排查
- 若未在 proxy 或 Marquez 中看到任何數據,請檢查 Airflow 的任務日志,查看是否出現以下消息:
[2022-08-16, 21:23:19 UTC] {factory.py:122} ERROR - Did not find openlineage.yml and OPENLINEAGE_URL is not set
。若出現,說明環境變量OPENLINEAGE_URL
未正確設置,導致 OpenLineage 無法發出任何事件。請確保在通過 docker compose 設置 Airflow 時正確設置了環境變量。 - 有時 Marquez 可能無響應,無法通過 API 端口 5000 接收數據。若發現收到 Marquez 的 500 響應碼,或 Marquez UI 卡死,只需停止并重啟 Marquez 即可。
結論
本簡短教程介紹了如何搭建并運行一個簡單的 Apache Airflow 環境,使其在 DAG 運行期間能夠發出 OpenLineage 事件。我們還通過 OpenLineage proxy 與 Marquez 的組合,監控并接收了血緣事件。希望本教程有助于理解如何將 Airflow 與 OpenLineage 結合,以及如何輕松使用 proxy 和 Marquez 監控其數據和最終結果。
風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。