血緣元數據采集開放標準:OpenLineage Guides 在 Airflow 中使用 OpenLineage Proxy

OpenLineage

OpenLineage 是一個用于元數據和血緣采集的開放標準,專為在作業運行時動態采集數據而設計。它通過統一的命名策略定義了由作業(Job)、運行實例(Run)和數據集(Dataset) 組成的通用模型,并通過可擴展的Facets機制對這些實體進行元數據增強。
該項目是 LF AI & Data 基金會的畢業級項目,處于活躍開發階段,歡迎社區貢獻。

在 Airflow 中使用 OpenLineage Proxy

本教程介紹如何將 OpenLineage Proxy 與 Airflow 結合使用。OpenLineage 提供多種集成方案,可在使用 Airflow 集成 時讓 Airflow 發出 OpenLineage 事件。本教程將使用 Docker Compose 運行本地 Airflow 實例,并學習如何啟用和配置 OpenLineage 以發出數據血緣事件。教程將使用兩個后端來查看數據血緣:1)Proxy,2)Marquez。

目錄

  • 使用 Docker Compose 搭建本地 Airflow 環境
  • 配置 Marquez
  • 啟動所有服務
  • 訪問 Airflow UI
  • 運行示例 DAG

使用 Docker Compose 搭建本地 Airflow 環境

Airflow 提供一種便捷方式,通過 Docker Compose 搭建并運行完整環境。因此,在開始本教程前,需先安裝以下組件。

前提條件

  • Docker 20.10.0+
  • Docker Desktop
  • Docker Compose
  • Java 11

若使用 macOS Monterey(macOS 12),需通過禁用 AirPlay 接收器釋放 5000 端口。若需訪問 Marquez Web UI,還需確保 3000 端口空閑。

參考以下指南使用 Docker Compose 搭建并運行 Airflow。

首先,創建一個新目錄,用于存放所有工作文件。

mkdir ~/airflow-ol &&
cd ~/airflow-ol

然后,下載我們將要運行的 Docker Compose 文件。

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'

這將允許向 Docker 容器傳遞新的環境變量 OPENLINEAGE_URL,OpenLineage 需要該變量才能工作。

接著,創建以下目錄,這些目錄將被掛載并由啟動 Airflow 的 Docker Compose 使用。

mkdir dags &&
mkdir logs &&
mkdir plugins

同時,創建 .env 文件,其中包含 Airflow 用于安裝所需額外 Python 包的環境變量。本教程將安裝 openlineage-airflow 包。

echo "_PIP_ADDITIONAL_REQUIREMENTS=openlineage-airflow" > .env

還需告知 OpenLineage 將血緣數據發送至何處。

echo "OPENLINEAGE_URL=http://host.docker.internal:4433" >> .env

將后端設置為 host.docker.internal 的原因是我們將在主機而非 Airflow 的 Docker 環境中運行 OpenLineage Proxy。代理將在 4433 端口監聽血緣數據。

將 OpenLineage Proxy 配置為接收端

OpenLineage Proxy 是一個簡單工具,可輕松搭建并運行以接收 OpenLineage 數據。代理本身不執行任何操作,僅顯示接收到的數據。可選地,它還可通過 HTTP 將數據轉發至任何兼容的 OpenLineage 后端。

從 git 下載代理代碼并構建:

cd ~ &&
git clone https://github.com/OpenLineage/OpenLineage.git &&
cd OpenLineage/proxy/backend &&
./gradlew build

現在,復制 proxy.dev.yml 并按以下內容編輯,保存為 proxy.yml

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.server:applicationConnectors:- type: httpport: ${OPENLINEAGE_PROXY_PORT:-4433}adminConnectors:- type: httpport: ${OPENLINEAGE_PROXY_ADMIN_PORT:-4434}logging:level: ${LOG_LEVEL:-INFO}appenders:- type: consoleproxy:source: openLineageProxyBackendstreams:- type: Console- type: Httpurl: http://localhost:5000/api/v1/lineage

配置 Marquez

最后一步是配置 Marquez 后端。使用 Marquez 的快速開始文檔搭建 Marquez 環境。

cd ~ &&
git clone https://github.com/MarquezProject/marquez.git

在 marquez/docker-compose.dev.yml 中,更改 pghero 的端口,以釋放 8080 端口供 Airflow 使用:

version: "3.7"
services:api:build: .seed_marquez:build: .pghero:image: ankane/pgherocontainer_name: pgheroports:- "8888:8888"environment:DATABASE_URL: postgres://postgres:password@db:5432

啟動所有服務

啟動 Marquez

啟動 Docker Desktop,然后:

cd ~/marquez &&
./docker/up.sh

啟動 OpenLineage proxy

cd ~/OpenLineage/proxy/backend &&
./gradlew runShadow

啟動 Airflow

cd ~/airflow-ol
docker-compose up

airflow_dev_setup

此時,Apache Airflow 應已運行,并能夠將血緣數據發送至 OpenLineage Proxy,而 OpenLineage Proxy 將數據轉發至 Marquez。因此,我們既可以檢查數據負載,也可以以圖形形式查看血緣數據。

訪問 Airflow UI

所有服務啟動后,現在可通過瀏覽器訪問 Airflow UI,地址為 http://localhost:8080

初始登錄 ID 和密碼為 airflow/airflow

運行示例 DAG

登錄 Airflow UI 后,會看到啟動時已預填充的多個示例 DAG。我們可以運行其中一些,以查看它們生成的 OpenLineage 事件。

運行 Bash Operator

在 DAGs 頁面,找到 example_bash_operator

airflow_trigger_dag

點擊右側的 ? 按鈕,將彈出提示框。選擇 Trigger DAG 手動觸發并運行 DAG。

將看到 DAG 運行并最終完成。

檢查 OpenLineage 事件

一切完成后,應在 OpenLineage proxy 的控制臺中看到多條 JSON 數據負載輸出。

INFO  [2022-08-16 21:39:41,411] io.openlineage.proxy.api.models.ConsoleLineageStream: {"eventTime" : "2022-08-16T21:39:40.854926Z","eventType" : "START","inputs" : [ ],"job" : {"facets" : { },"name" : "example_bash_operator.runme_2","namespace" : "default"},"outputs" : [ ],"producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","run" : {"facets" : {"airflow_runArgs" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","externalTrigger" : true},"airflow_version" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet","airflowVersion" : "2.3.3","openlineageAirflowVersion" : "0.12.0","operator" : "airflow.operators.bash.BashOperator","taskInfo" : "{'_BaseOperator__init_kwargs': {'task_id': 'runme_2', 'params': <***.models.param.ParamsDict object at 0xffff7467b610>, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1'}, '_BaseOperator__from_mapped': False, 'task_id': 'runme_2', 'task_group': <weakproxy at 0xffff74676ef0 to TaskGroup at 0xffff7467ba50>, 'owner': '***', 'email': None, 'email_on_retry': True, 'email_on_failure': True, 'execution_timeout': None, 'on_execute_callback': None, 'on_failure_callback': None, 'on_success_callback': None, 'on_retry_callback': None, '_pre_execute_hook': None, '_post_execute_hook': None, 'executor_config': {}, 'run_as_user': None, 'retries': 0, 'queue': 'default', 'pool': 'default_pool', 'pool_slots': 1, 'sla': None, 'trigger_rule': <TriggerRule.ALL_SUCCESS: 'all_success'>, 'depends_on_past': False, 'ignore_first_depends_on_past': True, 'wait_for_downstream': False, 'retry_delay': datetime.timedelta(seconds=300), 'retry_exponential_backoff': False, 'max_retry_delay': None, 'params': <***.models.param.ParamsDict object at 0xffff7467b4d0>, 'priority_weight': 1, 'weight_rule': <WeightRule.DOWNSTREAM: 'downstream'>, 'resources': None, 'max_active_tis_per_dag': None, 'do_xcom_push': True, 'doc_md': None, 'doc_json': None, 'doc_yaml': None, 'doc_rst': None, 'doc': None, 'upstream_task_ids': set(), 'downstream_task_ids': {'run_after_loop'}, 'start_date': DateTime(2021, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'end_date': None, '_dag': <DAG: example_bash_operator>, '_log': <Logger ***.task.operators (INFO)>, 'inlets': [], 'outlets': [], '_inlets': [], '_outlets': [], '_BaseOperator__instantiated': True, 'bash_command': 'echo \"example_bash_operator__runme_2__20220816\" && sleep 1', 'env': None, 'output_encoding': 'utf-8', 'skip_exit_code': 99, 'cwd': None, 'append_env': False}"},"nominalTime" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet","nominalStartTime" : "2022-08-16T21:39:38.005668Z"},"parentRun" : {"_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.12.0/integration/airflow","_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet","job" : {"name" : "example_bash_operator","namespace" : "default"},"run" : {"runId" : "39ad10d1-72d9-3fe9-b2a4-860c651b98b7"}}},"runId" : "313b4e71-9cde-4c83-b641-dd6773bf114b"}
}

檢查 Marquez

還可打開瀏覽器訪問 http://localhost:3000 進入 Marquez UI,查看來自 Airflow 的 OpenLineage 事件。

marquez_bash_jobs

運行其他 DAG

由于教程篇幅限制,此處不再運行其他示例 DAG,但你可以嘗試運行它們,觀察各 DAG 如何發出 OpenLineage 事件。請嘗試運行其他示例,如 example_python_operator,它也會發出 OpenLineage 事件。

通常,當 DAG 運行涉及某些被使用或創建的 dataset 時,數據血緣會更加完整和有用。運行這些 DAG 后,你將能看到不同 DAG 和任務如何連接同一數據集,最終形成如下所示的數據血緣圖:

以下是目前已具備提取器、能夠提取并發出 OpenLineage 事件的 Airflow 算子:

  • PostgresOperator
  • MySqlOperator
  • BigQueryOperator
  • SnowflakeOperator
  • GreatExpectationsOperator
  • PythonOperator

更多可在 Airflow 中運行的 OpenLineage 示例 DAG,請參閱 Apache 示例。

故障排查

  • 若未在 proxy 或 Marquez 中看到任何數據,請檢查 Airflow 的任務日志,查看是否出現以下消息:[2022-08-16, 21:23:19 UTC] {factory.py:122} ERROR - Did not find openlineage.yml and OPENLINEAGE_URL is not set。若出現,說明環境變量 OPENLINEAGE_URL 未正確設置,導致 OpenLineage 無法發出任何事件。請確保在通過 docker compose 設置 Airflow 時正確設置了環境變量。
  • 有時 Marquez 可能無響應,無法通過 API 端口 5000 接收數據。若發現收到 Marquez 的 500 響應碼,或 Marquez UI 卡死,只需停止并重啟 Marquez 即可。

結論

本簡短教程介紹了如何搭建并運行一個簡單的 Apache Airflow 環境,使其在 DAG 運行期間能夠發出 OpenLineage 事件。我們還通過 OpenLineage proxy 與 Marquez 的組合,監控并接收了血緣事件。希望本教程有助于理解如何將 Airflow 與 OpenLineage 結合,以及如何輕松使用 proxy 和 Marquez 監控其數據和最終結果。

風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/95479.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/95479.shtml
英文地址,請注明出處:http://en.pswp.cn/web/95479.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Linux】網絡(中)

目錄1. 序列化和反序列化1.1 序列化1.2 反序列化2. 網絡版本計算器&#xff08;自定義協議&#xff09;3. 再次理解OSI七層模型4. HTTP協議4.1 HTTP協議格式4.2 HTTP的方法4.3 HTTP的狀態碼4.4 HTTP常見Header4.5 長連接和短連接4.6 Cookie5. HTTPS協議5.1 對稱加密和非對稱加密…

AI 寫作實戰:用 GPT-4o+ Claude 3 生成小紅書文案,轉化率提升 30%

引言?AI 寫作開啟小紅書營銷新引擎在社交媒體營銷的浪潮中&#xff0c;小紅書以其獨特的社區氛圍和龐大的年輕用戶群體&#xff0c;成為品牌推廣的關鍵陣地。然而&#xff0c;撰寫既吸引眼球又能高效轉化的文案并非易事&#xff0c;傳統人工編寫不僅耗時費力&#xff0c;還難以…

一個月漲粉30萬,Coze智能體一鍵生成民間傳說爆款視頻,3分鐘上手

最近發現一個賬號&#xff0c;用AI將民間傳說故事轉化為生動視頻&#xff0c;短短一個月漲粉30萬&#xff0c;條均播放 量破百萬。這種視頻制作真的需要專業團隊嗎&#xff1f;今天教大家用Coze智能體工作流&#xff0c;一鍵生成 爆款民間故事視頻&#xff01;工作流功能 用Coz…

Linux arm64 PTE contiguous bit

文章目錄一、簡介1.1 contiguous PTE1.2 demo二、Linux 內核中的實現2.1 宏定義2.2 __create_pgd_mapping2.2.1 alloc_init_cont_pmdinit_pmd2.2.2 alloc_init_cont_pteinit_pte2.3 hugetlbpage2.3.1 find_num_contig2.3.2 num_contig_ptes2.3.3 huge_pte_offset2.3.4 huge_pte…

深入分析 json2(新)與標準的 jsonrpc的區別

這兩個模塊都用于實現 JSON 風格的遠程過程調用&#xff08;RPC&#xff09;接口&#xff0c;但設計哲學、使用方式、安全性和現代化程度有顯著差異。 &#x1f4c2; 對比背景 文件 功能 來源 jsonrpc.py 標準的 JSON-RPC 2.0 兼容接口 Odoo 內核已有邏輯 json2.py 自定…

IO_HW_9_3

一、使用消息隊列實現兩個程序間的相互通信二、思維導圖三、牛客網

fastlio配置與過程中遇到的問題

&#x1f680; Fast-LIO 安裝與運行指南 我之前已經創建并使用原有的工作空間 catkin_ws,如果沒有創建一個。 使用環境 ubantu20.04 ros1 noetic版本 我作的是要在已有的 ~/catkin_ws 中編譯 原版 FAST-LIO&#xff08;來自 HKU-MARS 官方倉庫&#xff09;。 最終下載官方文檔中…

Python 工具: Windows 帶寬監控工具

Python 工具&#xff1a; Windows 帶寬監控工具環境介紹會使用的庫多線程關鍵代碼&#xff1a;系統流量采集&#xff1a;用 psutil 獲取網絡數據概念&#xff1a;網絡流量的“增量”與“總量”代碼中的流量采集邏輯Flask Web框架&#xff1a;搭建后端服務前端部分交互邏輯&…

【Java】Redis(中間件)

一、對Redis的理解Reids是一種基于內存的數據庫&#xff0c;對數據的讀寫操作都在內存中完成&#xff0c;因此讀寫速度非常快&#xff0c;常用于緩存、消息隊列、分布式鎖等場景。除此之外&#xff0c;Redis還支持事務、持久化、Lua腳本、多種集群方案&#xff08;主從復制模式…

【題解】洛谷P1776 寶物篩選 [單調隊列優化多重背包]

二進制優化還是不夠快&#xff0c;如果我們想時間復雜度為 &#xff0c;還得找新的方法。 &#xff08;W 為背包最大可承載量&#xff0c;N 為物品種類數&#xff09; 例題&#xff1a;P1776 寶物篩選 - 洛谷 原來的轉移式很普通&#xff1a; 注意到對于每個 &#xff0c;有…

數據結構_循環隊列_犧牲一個存儲空間_不犧牲額外的存儲空間 Circular Queue(C語言實現_超詳細)

目錄循環隊列的引出區別普通隊列和循環隊列兩種循環隊列的概念循環隊列深入理解題目&#xff1a;此題&#xff0c;分為犧牲一個額外空間和不犧牲一個額外空間不犧牲一個額外空間完成第一步完成第二步完成第三步完成第四步犧牲一個額外空間完成第一步完成第二步完成第三步完成第…

Linux_網絡基礎

?? 歡迎大家來到小傘的大講堂?? &#x1f388;&#x1f388;養成好習慣&#xff0c;先贊后看哦~&#x1f388;&#x1f388; 所屬專欄&#xff1a;LInux_st 小傘的主頁&#xff1a;xiaosan_blog 制作不易&#xff01;點個贊吧&#xff01;&#xff01;謝謝喵&#xff01;&a…

Portainer:Docker可視化管理神器部署與使用攻略

Portainer是一款優秀的Docker可視化管理工具&#xff0c;它提供了簡潔美觀的Web界面&#xff0c;可以通過點擊鼠標輕松管理Docker環境。 一、Portainer簡介 Portainer是一個輕量級的Docker管理界面&#xff0c;具有以下特點&#xff1a; 可視化操作&#xff1a;通過Web界面管…

OVITO3.13.1_ Mac中文_材料科學、物理及化學領域設計的數據可視化和分析軟件_安裝教程

軟件下載 【名稱】&#xff1a;****OVITO3.13.1Mac中文 【大小】&#xff1a;****154M 【語言】&#xff1a;簡體中文 【安裝環境】&#xff1a;****mac 【網站下載鏈接】&#xff1a; https://a-xing.top/3008.html軟件應用 軟件應用 Ovito能做什么&#xff1f; Ovito的功能十…

MySQL 開發避坑:DROP TABLE 前你必須知道的幾件事

MySQL 中刪除表主要使用 DROP TABLE 語句。這是一個需要非常謹慎的操作&#xff0c;因為一旦執行&#xff0c;表結構和表中的所有數據都會被永久刪除。1. 基本語法&#xff1a;刪除單個表sqlDROP TABLE [IF EXISTS] table_name;* DROP TABLE: 核心命令&#xff0c;用于刪除表…

淺談人工智能之阿里云搭建coze平臺

淺談人工智能之阿里云搭建coze平臺 一、部署環境準備 阿里云服務器配置要求 ○ 規格&#xff1a;最低2核CPU 4GB內存&#xff08;推薦4核8GB保障流暢運行&#xff09;&#xff0c;作者原先想要利舊&#xff0c;使用了2核2GB的服務器&#xff0c;但是跑不起來&#xff0c;后來自…

ego(2)---初始軌跡生成后的關鍵點采樣

在初始的多項式軌跡生成后&#xff0c;是要經過一個關鍵點采樣&#xff0c;使用關鍵點來進行后續的 B 樣條曲線擬合的。即&#xff1a;初始多項式擬合->關鍵點采樣->B樣條擬合關鍵點采樣的思路關鍵點采樣使用時間步長 ts 來在初始軌跡方程中取點。在上一步的初始軌跡生成…

專項智能練習(信息安全防護措施)

3.以下屬于網絡安全威脅的是&#xff08;A &#xff09;。 A.非授權訪問、病毒感染、信息泄露、拒絕網絡服務 B.信息泄露、非授權訪問、病毒感染、硬盤損壞 C.信息篡改、非授權訪問、病毒感染、硬盤損壞 D.網絡異常、非授權訪問、信息篡改、病毒感染 解析本題考查網絡安全威脅。…

ubuntu編譯webrtc庫

一. 前言 本文介紹在 ubuntu 下如何通過 webrtc 源碼編譯出指定版本 webrtc.lib 庫&#xff08;以 m94 版本為例&#xff09;。 二. 編譯步驟 1. 下載depot_tools工具 depot_tools 是 Google 用來管理大型項目代碼&#xff08;例如 WebRTC&#xff09;的工具集&#xff0c;它…

基于ZooKeeper實現分布式鎖(Spring Boot接入)及與Kafka實現的對比分析

在分布式系統中,多節點對共享資源的并發訪問往往會引發數據一致性問題,分布式鎖正是解決這一問題的核心組件。本文將從原理出發,詳細講解基于ZooKeeper實現分布式鎖的完整流程,提供Spring Boot接入的可運行代碼,并深入對比其與Kafka實現分布式鎖的異同點及優缺點,幫助開發…